From 4db0b3d0c2e5d210fd466b3d99980d3964ca90aa Mon Sep 17 00:00:00 2001 From: immanuwell Date: Fri, 26 Jun 2026 12:38:47 +0400 Subject: [PATCH] fix: disable last-N optimization when pipes can overwrite _time --- apptest/tests/lastnoptimization_test.go | 35 +++++++++++++++++++++++++ lib/logstorage/parser_test.go | 14 +++++++--- lib/logstorage/pipe_extract.go | 7 +++-- lib/logstorage/pipe_extract_regexp.go | 7 +++-- lib/logstorage/pipe_unpack_json.go | 4 +-- lib/logstorage/pipe_unpack_logfmt.go | 4 +-- 6 files changed, 57 insertions(+), 14 deletions(-) diff --git a/apptest/tests/lastnoptimization_test.go b/apptest/tests/lastnoptimization_test.go index bdb30119c4..bb615a662a 100644 --- a/apptest/tests/lastnoptimization_test.go +++ b/apptest/tests/lastnoptimization_test.go @@ -88,3 +88,38 @@ func TestVlsingleLastnOptimization(t *testing.T) { } assertLogsQLResponseEqual(t, got, wantResponse) } + +func TestVlsingleLastnOptimizationWithUnpackedInvalidTime(t *testing.T) { + fs.MustRemoveDir(t.Name()) + tc := apptest.NewTestCase(t) + defer tc.Stop() + sut := tc.MustStartDefaultVlsingle() + + sut.JSONLineWrite(t, []string{ + `{"_msg":"{\"_time\":\"not-a-timestamp\",\"msg\":\"bad\"}","_time":"2025-01-01T01:00:00Z"}`, + }, apptest.IngestOpts{}) + sut.ForceFlush(t) + + query := `* | unpack_json | keep _msg, _time | sort by (_time desc) limit 1` + + got := sut.LogsQLQuery(t, query, apptest.QueryOpts{ + Start: "2025-01-01T01:00:00Z", + End: "2025-01-01T01:00:00.000000001Z", + }) + wantResponse := &apptest.LogsQLQueryResponse{ + LogLines: []string{ + `{"_msg":"{\"_time\":\"not-a-timestamp\",\"msg\":\"bad\"}","_time":"not-a-timestamp"}`, + }, + } + assertLogsQLResponseEqual(t, got, wantResponse) + + res, statusCode := sut.LogsQLQueryRaw(t, query, apptest.QueryOpts{ + Start: "2025-01-01T01:00:00Z", + End: "2025-01-01T01:00:03Z", + }) + if statusCode != 200 { + t.Fatalf("unexpected response status code for optimized query: %d; response\n%s", statusCode, res) + } + got = apptest.NewLogsQLQueryResponse(t, res) + assertLogsQLResponseEqual(t, got, wantResponse) +} diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 11b9c202fc..b874c4f139 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -4154,7 +4154,9 @@ func TestQueryCanReturnLastNResults(t *testing.T) { f("* | delete foo, bar", true) f("* | drop_empty_fields", true) f("* | extract 'bar'", true) + f("* | extract '<_time>bar'", false) f("* | extract_regexp 'foo(?Pbaz)'", true) + f("* | extract_regexp 'foo(?P<_time>baz)'", false) f("* | facets", false) f("* | field_names", false) f("* | fields foo, bar", false) @@ -4201,8 +4203,12 @@ func TestQueryCanReturnLastNResults(t *testing.T) { f("* | total_stats count()", false) f("* | union (x)", false) f("* | uniq (x)", false) - f("* | unpack_json x", true) - f("* | unpack_logfmt x", true) + f("* | unpack_json x", false) + f("* | unpack_json fields (_time)", false) + f("* | unpack_json result_prefix x_", true) + f("* | unpack_logfmt x", false) + f("* | unpack_logfmt fields (_time)", false) + f("* | unpack_logfmt result_prefix x_", true) f("* | unpack_syslog x", true) f("* | unpack_words x", true) f("* | unroll by (x)", true) @@ -4386,8 +4392,8 @@ func TestQueryGetStatsLabelsAddGroupingByTime_Success(t *testing.T) { f("* | replace_regexp ('foo', 'bar') | count() x", nsecsPerDay, 0, []string{"_time"}, `* | replace_regexp (foo, bar) | stats by (_time:86400000000000) count(*) as x`) f("* | split 'foo' | count() x", nsecsPerDay, 0, []string{"_time"}, `* | split foo | stats by (_time:86400000000000) count(*) as x`) f("* | time_add 1h | count() x", nsecsPerDay, 0, []string{"_time"}, `* | time_add 1h | stats by (_time:86400000000000) count(*) as x`) - f("* | unpack_json x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_json from x | stats by (_time:86400000000000) count(*) as x`) - f("* | unpack_logfmt x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_logfmt from x | stats by (_time:86400000000000) count(*) as x`) + f("* | unpack_json x result_prefix x_ | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_json from x result_prefix x_ | stats by (_time:86400000000000) count(*) as x`) + f("* | unpack_logfmt x result_prefix x_ | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_logfmt from x result_prefix x_ | stats by (_time:86400000000000) count(*) as x`) f("* | unpack_syslog x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_syslog from x | stats by (_time:86400000000000) count(*) as x`) f("* | unpack_words x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_words from x | stats by (_time:86400000000000) count(*) as x`) f("* | unroll by (x) | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unroll by (x) | stats by (_time:86400000000000) count(*) as x`) diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 09b4fc9534..d6bafe0fd4 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -52,8 +52,11 @@ func (pe *pipeExtract) canLiveTail() bool { } func (pe *pipeExtract) canReturnLastNResults() bool { - // TODO: properly verify that the extracted fields do not overwrite the _time field with non-timestamp values. - + for _, f := range pe.ptn.fields { + if f.name == "_time" { + return false + } + } return true } diff --git a/lib/logstorage/pipe_extract_regexp.go b/lib/logstorage/pipe_extract_regexp.go index 70c5864e4d..6c855af4d9 100644 --- a/lib/logstorage/pipe_extract_regexp.go +++ b/lib/logstorage/pipe_extract_regexp.go @@ -59,8 +59,11 @@ func (pe *pipeExtractRegexp) canLiveTail() bool { } func (pe *pipeExtractRegexp) canReturnLastNResults() bool { - // TODO: properly verify that the extracted fields do not overwrite the _time field with non-timestamp values. - + for _, f := range pe.reFields { + if f == "_time" { + return false + } + } return true } diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 6a9fa0b9ee..9e47acf2ed 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -67,9 +67,7 @@ func (pu *pipeUnpackJSON) canLiveTail() bool { } func (pu *pipeUnpackJSON) canReturnLastNResults() bool { - // TODO: verify that the unpacked fields do not overwrite _time with non-timestamp values. - - return true + return pu.resultPrefix != "" || !prefixfilter.MatchFilters(pu.fieldFilters, "_time") } func (pu *pipeUnpackJSON) isFixedOutputFieldsOrder() bool { diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index d6ae35f5c8..0bee15d1c9 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -58,9 +58,7 @@ func (pu *pipeUnpackLogfmt) canLiveTail() bool { } func (pu *pipeUnpackLogfmt) canReturnLastNResults() bool { - // TODO: verify that the unpacked fields do not overwrite _time with non-timestamp values. - - return true + return pu.resultPrefix != "" || !prefixfilter.MatchFilters(pu.fieldFilters, "_time") } func (pu *pipeUnpackLogfmt) isFixedOutputFieldsOrder() bool {