Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.common;

import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
Expand All @@ -43,6 +44,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.read_tsfile.ExternalTsFileQueryResource;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainOutputFormat;
Expand All @@ -61,6 +63,7 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -142,6 +145,21 @@
private boolean releaseSchemaTreeAfterAnalyzing = true;
private LongConsumer reserveMemoryForSchemaTreeFunc = null;

private boolean reservingMemoryForSchemaTree = false;

private boolean resultSetColumnMemoryTrackingEnabled = false;
private boolean alignByDeviceForResultSetColumnTracking = false;
private long seriesLimitForResultSetColumnTracking = 0;
private long seriesOffsetForResultSetColumnTracking = 0;
private long matchedSourceColumnsForResultSet = 0;
private long expandedSourceColumnsForResultSet = 0;
private long sourceColumnMemoryCostForResultSet = 0;
private long generatedResultSetColumns = 0;
private long generatedResultSetColumnMemoryCost = 0;
private long schemaFetchEstimatedMemoryCost = 0;
private long schemaFetchReservedMemoryCost = 0;
private long schemaFetchDeserializedColumnCount = 0;

private boolean userQuery = false;

/**
Expand Down Expand Up @@ -218,8 +236,17 @@
if (reserveMemoryForSchemaTreeFunc == null) {
return;
}
reserveMemoryForSchemaTreeFunc.accept(memoryCost);
schemaFetchEstimatedMemoryCost += memoryCost;
reservingMemoryForSchemaTree = true;
try {
reserveMemoryForSchemaTreeFunc.accept(memoryCost);
} catch (MemoryNotEnoughException e) {
throw enrichSchemaFetchMemoryNotEnoughException(e, memoryCost);
} finally {
reservingMemoryForSchemaTree = false;
}
this.reservedMemoryCostForSchemaTree += memoryCost;
this.schemaFetchReservedMemoryCost += memoryCost;
}

public void setReleaseSchemaTreeAfterAnalyzing(boolean releaseSchemaTreeAfterAnalyzing) {
Expand All @@ -244,6 +271,7 @@
}
this.initResultNodeContext();
this.releaseAllMemoryReservedForFrontEnd();
this.resetResultSetColumnMemoryTracking();
}

private void cleanUpCte() {
Expand Down Expand Up @@ -540,11 +568,25 @@
* single-threaded manner.
*/
public void reserveMemoryForFrontEnd(final long bytes) {
this.memoryReservationManager.reserveMemoryCumulatively(bytes);
try {
this.memoryReservationManager.reserveMemoryCumulatively(bytes);
} catch (MemoryNotEnoughException e) {
if (reservingMemoryForSchemaTree) {
throw e;
}
throw enrichResultSetColumnMemoryNotEnoughException(e, bytes);
}
}

public void reserveMemoryForFrontEndImmediately() {
this.memoryReservationManager.reserveMemoryImmediately();
try {
this.memoryReservationManager.reserveMemoryImmediately();
} catch (MemoryNotEnoughException e) {
if (reservingMemoryForSchemaTree) {
throw e;
}
throw enrichResultSetColumnMemoryNotEnoughException(e, extractRequestedMemory(e));
}
}

public void releaseAllMemoryReservedForFrontEnd() {
Expand All @@ -555,6 +597,253 @@
this.memoryReservationManager.releaseMemoryCumulatively(bytes);
}

public void initResultSetColumnMemoryTracking(
long seriesLimit, long seriesOffset, boolean alignByDevice) {
resetResultSetColumnMemoryTracking();
resultSetColumnMemoryTrackingEnabled = true;
seriesLimitForResultSetColumnTracking = seriesLimit;
seriesOffsetForResultSetColumnTracking = seriesOffset;
alignByDeviceForResultSetColumnTracking = alignByDevice;
}

public void recordMatchedSourceColumnsForResultSet(long columnCount) {
if (resultSetColumnMemoryTrackingEnabled && columnCount > 0) {
matchedSourceColumnsForResultSet += columnCount;
}
}

public void recordExpandedSourceColumnForResultSet(long memoryCost) {
if (!resultSetColumnMemoryTrackingEnabled) {
return;
}
expandedSourceColumnsForResultSet++;
sourceColumnMemoryCostForResultSet += Math.max(memoryCost, 0);
}

public void recordGeneratedResultSetColumn(long memoryCost) {
if (!resultSetColumnMemoryTrackingEnabled) {
return;
}
generatedResultSetColumns++;
generatedResultSetColumnMemoryCost += Math.max(memoryCost, 0);
}

public void recordSchemaFetchDeserializedColumns(long columnCount) {
if (columnCount > 0) {
schemaFetchDeserializedColumnCount += columnCount;
}
}

private void resetResultSetColumnMemoryTracking() {
resultSetColumnMemoryTrackingEnabled = false;
alignByDeviceForResultSetColumnTracking = false;
seriesLimitForResultSetColumnTracking = 0;
seriesOffsetForResultSetColumnTracking = 0;
matchedSourceColumnsForResultSet = 0;
expandedSourceColumnsForResultSet = 0;
sourceColumnMemoryCostForResultSet = 0;
generatedResultSetColumns = 0;
generatedResultSetColumnMemoryCost = 0;
schemaFetchEstimatedMemoryCost = 0;
schemaFetchReservedMemoryCost = 0;
schemaFetchDeserializedColumnCount = 0;
}

private MemoryNotEnoughException enrichResultSetColumnMemoryNotEnoughException(
MemoryNotEnoughException e, long requestedBytes) {
if (!resultSetColumnMemoryTrackingEnabled
|| (matchedSourceColumnsForResultSet == 0
&& expandedSourceColumnsForResultSet == 0
&& generatedResultSetColumns == 0)) {
return e;
}

long freeBytes = LocalExecutionPlanner.getInstance().getFreeMemoryForOperators();
long shortageBytes =
requestedBytes > 0 && requestedBytes > freeBytes ? requestedBytes - freeBytes : -1;
long exceededColumns = estimateExceededColumns(freeBytes, requestedBytes);

return new MemoryNotEnoughException(
String.format(
Locale.ROOT,
"Not enough memory while analyzing metadata for query result columns. "
+ "The result set has too many columns. "
+ "Before the failure, IoTDB had matched %,d source columns for result-column "
+ "expansion, expanded %,d source columns, and generated %,d result-set columns. "
+ "%s"
+ "Current series pagination is %s. "
+ "Use SLIMIT/SOFFSET to reduce returned series%s, narrow the path pattern, "
+ "or increase query memory%s. "
+ "Memory details: source-column memory for result expansion %s, "
+ "generated-result-column memory %s, requested this time %s, current free memory %s. "

Check warning on line 678 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 103).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8C0IYtd6uwXB75l862&open=AZ8C0IYtd6uwXB75l862&pullRequest=18039
+ "Original error: %s",
matchedSourceColumnsForResultSet,
expandedSourceColumnsForResultSet,
generatedResultSetColumns,
exceededColumns > 0
? String.format(
Locale.ROOT,
"The matched source columns exceed the estimated current memory capacity by "
+ "at least %,d columns. ",
exceededColumns)
: "",
formatSeriesPaginationForDiagnostics(),
alignByDeviceForResultSetColumnTracking
? ""
: ", use ALIGN BY DEVICE to reduce cross-device result columns",
shortageBytes > 0
? " by at least " + formatBytes(shortageBytes)
: " for the query engine/operator memory pool",
formatBytes(sourceColumnMemoryCostForResultSet),
formatBytes(generatedResultSetColumnMemoryCost),
formatBytes(requestedBytes),
formatBytes(freeBytes),
e.getMessage()));
}

private MemoryNotEnoughException enrichSchemaFetchMemoryNotEnoughException(
MemoryNotEnoughException e, long requestedBytes) {
long freeBytes = LocalExecutionPlanner.getInstance().getFreeMemoryForOperators();
if (!resultSetColumnMemoryTrackingEnabled && schemaFetchDeserializedColumnCount == 0) {
return e;
}

long shortageBytes =
requestedBytes > 0 && requestedBytes > freeBytes ? requestedBytes - freeBytes : -1;
long exceededColumns = estimateExceededSchemaFetchColumns(freeBytes, requestedBytes);

return new MemoryNotEnoughException(
String.format(
Locale.ROOT,
"Not enough memory while fetching metadata for query analysis. "
+ "The result set may have too many columns. "
+ "Before the failure, IoTDB had deserialized %,d time-series columns from schema "
+ "fetch results. Schema fetch memory may be reserved before safely deserializing "
+ "the whole fetched metadata, so this count can be lower than the matched schema "
+ "columns. %s"
+ "Current series pagination is %s. "
+ "Use SLIMIT/SOFFSET to reduce returned series%s, narrow the path pattern, "
+ "or increase query memory%s. "
+ "Memory details: fetched schema tree estimated memory %s, "
+ "fetched schema tree reserved memory %s, requested this time %s, "
+ "current free memory %s. Original error: %s",
schemaFetchDeserializedColumnCount,
exceededColumns > 0
? String.format(
Locale.ROOT,
"The fetched schema columns exceed the estimated current memory capacity by "
+ "at least %,d columns. ",
exceededColumns)
: "",
formatSeriesPaginationForDiagnostics(),
alignByDeviceForResultSetColumnTracking
? ""
: ", use ALIGN BY DEVICE to reduce cross-device result columns",
shortageBytes > 0
? " by at least " + formatBytes(shortageBytes)
: " for the query engine/operator memory pool",
formatBytes(schemaFetchEstimatedMemoryCost),
formatBytes(schemaFetchReservedMemoryCost),
formatBytes(requestedBytes),
formatBytes(freeBytes),
e.getMessage()));
}

private long estimateExceededColumns(long freeBytes, long requestedBytes) {
long avgColumnMemory;
if (expandedSourceColumnsForResultSet > 0 && sourceColumnMemoryCostForResultSet > 0) {
avgColumnMemory =
Math.max(1, sourceColumnMemoryCostForResultSet / expandedSourceColumnsForResultSet);
} else if (requestedBytes > 0) {
avgColumnMemory = requestedBytes;
} else {
return -1;
}
long estimatedCapacity =
(sourceColumnMemoryCostForResultSet + Math.max(freeBytes, 0)) / avgColumnMemory;
long columnsToCompare =
Math.max(matchedSourceColumnsForResultSet, expandedSourceColumnsForResultSet + 1);
return Math.max(0, columnsToCompare - estimatedCapacity);
}

private long estimateExceededSchemaFetchColumns(long freeBytes, long requestedBytes) {
if (schemaFetchDeserializedColumnCount <= 0) {
return -1;
}

long avgColumnMemory;
long columnsToCompare = schemaFetchDeserializedColumnCount;
if (schemaFetchReservedMemoryCost > 0) {
avgColumnMemory =
Math.max(
1, divideCeil(schemaFetchReservedMemoryCost, schemaFetchDeserializedColumnCount));
if (requestedBytes > 0) {
columnsToCompare += Math.max(1, divideCeil(requestedBytes, avgColumnMemory));
}
} else if (requestedBytes > 0) {
avgColumnMemory = Math.max(1, divideCeil(requestedBytes, schemaFetchDeserializedColumnCount));
} else {
return -1;
}

long estimatedCapacity =
(schemaFetchReservedMemoryCost + Math.max(freeBytes, 0)) / avgColumnMemory;
return Math.max(0, columnsToCompare - estimatedCapacity);
}

private static long divideCeil(long dividend, long divisor) {
return dividend / divisor + (dividend % divisor == 0 ? 0 : 1);
}

private String formatSeriesPaginationForDiagnostics() {
return String.format(
Locale.ROOT,
"SLIMIT=%s, SOFFSET=%,d",
seriesLimitForResultSetColumnTracking > 0
? String.format(Locale.ROOT, "%,d", seriesLimitForResultSetColumnTracking)
: "not set",
seriesOffsetForResultSetColumnTracking);
}

private static long extractRequestedMemory(MemoryNotEnoughException e) {
String message = e.getMessage();
if (message == null) {
return -1;
}
String marker = "the memory requested this time is ";
int start = message.indexOf(marker);
if (start < 0) {
return -1;
}
start += marker.length();
int end = message.indexOf('B', start);
if (end < 0) {
return -1;
}
try {
return Long.parseLong(message.substring(start, end));
} catch (NumberFormatException ignored) {
return -1;
}
}

private static String formatBytes(long bytes) {
if (bytes < 0) {
return "unknown";
}
if (bytes < 1024) {
return bytes + " B";
}
double value = bytes;
String[] units = {"B", "KB", "MB", "GB", "TB"};
int unitIndex = 0;
while (value >= 1024 && unitIndex < units.length - 1) {
value /= 1024;
unitIndex++;
}
return String.format(Locale.ROOT, "%.2f %s (%d B)", value, units[unitIndex], bytes);
}

public boolean useSampledAvgTimeseriesOperandMemCost() {
return numsOfSampledTimeseriesOperand >= MIN_SIZE_TO_USE_SAMPLED_TIMESERIES_OPERAND_MEM_COST;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,17 +570,24 @@ public static class SchemaNodeBatchDeserializer {
private Map<Integer, Template> templateMap = new HashMap<>();
private boolean isFirstBatch = true;

private long measurementCount = 0;

public boolean isFirstBatch() {
return isFirstBatch;
}

public long getMeasurementCount() {
return measurementCount;
}

public void deserializeFromBatch(InputStream inputStream) throws IOException {
isFirstBatch = false;
while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
stack.push(measurementNode);
measurementCount++;
if (measurementNode.isLogicalView()) {
hasLogicalView = true;
}
Expand Down Expand Up @@ -638,6 +645,7 @@ private void reset() {
// templateMap is set to the returned schema tree, so we should create a new one
templateMap = new HashMap<>();
isFirstBatch = true;
measurementCount = 0;
}
}

Expand Down
Loading
Loading