Skip to content

Commit

Permalink
Pipe: Pattern parsing pruning: When pattern is at the level below dat…
Browse files Browse the repository at this point in the history
…abase, the parsing logic can be skipped if tsfiles / tablets completely match with the pattern (#12049)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
Caideyipi and SteveYurongSu authored Feb 20, 2024
1 parent bcedeb7 commit 76702b0
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,12 @@ public void skipParsingTime() {
isTimeParsed = true;
}

public boolean shouldParsePatternOrTime() {
return !isPatternParsed || !isTimeParsed;
public boolean shouldParseTimeOrPattern() {
return shouldParseTime() || shouldParsePattern();
}

public boolean shouldParsePattern() {
return !isPatternParsed;
}

public boolean shouldParseTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ public Tablet convertToTablet() {

/////////////////////////// parsePatternOrTime ///////////////////////////

@Override
public boolean shouldParsePattern() {
final InsertNode node = getInsertNodeViaCacheIfPossible();
return super.shouldParsePattern()
&& (Objects.isNull(node) || !node.getDevicePath().getFullPath().startsWith(pattern));
}

public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public boolean isAligned() {
}

public Tablet convertToTablet() {
if (!shouldParsePatternOrTime()) {
if (!shouldParseTimeOrPattern()) {
return tablet;
}

Expand All @@ -206,7 +206,7 @@ public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
}

public boolean hasNoNeedParsingAndIsEmpty() {
return !shouldParsePatternOrTime() && tablet.rowSize == 0;
return !shouldParseTimeOrPattern() && tablet.rowSize == 0;
}

/////////////////////////// Object ///////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,41 @@ public boolean isEventTimeOverlappedWithTimeRange() {

/////////////////////////// TsFileInsertionEvent ///////////////////////////

@Override
public boolean shouldParseTimeOrPattern() {
boolean shouldParseTimeOrPattern = false;
try {
shouldParseTimeOrPattern = super.shouldParseTimeOrPattern();
return shouldParseTimeOrPattern;
} finally {
// Super method will call shouldParsePattern() and then init dataContainer at
// shouldParsePattern(). If shouldParsePattern() returns false, dataContainer will
// not be used, so we need to close the resource here.
if (!shouldParseTimeOrPattern) {
close();
}
}
}

@Override
public boolean shouldParsePattern() {
return super.shouldParsePattern() && initDataContainer().shouldParsePattern();
}

@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return initDataContainer().toTabletInsertionEvents();
}

private TsFileInsertionDataContainer initDataContainer() {
try {
if (dataContainer == null) {
waitForTsFileClose();
dataContainer =
new TsFileInsertionDataContainer(
tsFile, getPattern(), startTime, endTime, pipeTaskMeta, this);
}
return dataContainer.toTabletInsertionEvents();
return dataContainer;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class TsFileInsertionDataContainer implements AutoCloseable {
private final Map<String, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;

private boolean shouldParsePattern = false;

public TsFileInsertionDataContainer(File tsFile, String pattern, long startTime, long endTime)
throws IOException {
this(tsFile, pattern, startTime, endTime, null, null);
Expand Down Expand Up @@ -161,13 +163,23 @@ else if (pattern.length() > deviceId.length() && pattern.startsWith(deviceId)) {
// high cost check comes later
&& pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
filteredMeasurements.add(measurement);
} else {
// Parse pattern iff there are measurements filtered out
shouldParsePattern = true;
}
}

if (!filteredMeasurements.isEmpty()) {
filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
}
}

// case 3: for example, pattern is root.a.b.c and device is root.a.b.d
// in this case, no data can be matched
else {
// Parse pattern iff there are measurements filtered out
shouldParsePattern = true;
}
}

return filteredDeviceMeasurementsMap;
Expand Down Expand Up @@ -254,6 +266,10 @@ public TabletInsertionEvent next() {
};
}

public boolean shouldParsePattern() {
return shouldParsePattern;
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public synchronized void collect(Event event) {
}

private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent sourceEvent) {
if (sourceEvent.shouldParsePatternOrTime()) {
if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent = sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
collectEvent(parsedEvent);
Expand All @@ -89,7 +89,7 @@ private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent sourceEvent
}

private void parseAndCollectEvent(PipeRawTabletInsertionEvent sourceEvent) {
if (sourceEvent.shouldParsePatternOrTime()) {
if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent = sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
collectEvent(parsedEvent);
Expand All @@ -107,7 +107,7 @@ private void parseAndCollectEvent(PipeTsFileInsertionEvent sourceEvent) throws E
return;
}

if (!sourceEvent.shouldParsePatternOrTime()) {
if (!sourceEvent.shouldParseTimeOrPattern()) {
collectEvent(sourceEvent);
return;
}
Expand Down

0 comments on commit 76702b0

Please sign in to comment.