Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions apptest/tests/lastnoptimization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,38 @@ func TestVlsingleLastnOptimization(t *testing.T) {
}
assertLogsQLResponseEqual(t, got, wantResponse)
}

func TestVlsingleLastnOptimizationWithUnpackedInvalidTime(t *testing.T) {
fs.MustRemoveDir(t.Name())
tc := apptest.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartDefaultVlsingle()

sut.JSONLineWrite(t, []string{
`{"_msg":"{\"_time\":\"not-a-timestamp\",\"msg\":\"bad\"}","_time":"2025-01-01T01:00:00Z"}`,
}, apptest.IngestOpts{})
sut.ForceFlush(t)

query := `* | unpack_json | keep _msg, _time | sort by (_time desc) limit 1`

got := sut.LogsQLQuery(t, query, apptest.QueryOpts{
Start: "2025-01-01T01:00:00Z",
End: "2025-01-01T01:00:00.000000001Z",
})
wantResponse := &apptest.LogsQLQueryResponse{
LogLines: []string{
`{"_msg":"{\"_time\":\"not-a-timestamp\",\"msg\":\"bad\"}","_time":"not-a-timestamp"}`,
},
}
assertLogsQLResponseEqual(t, got, wantResponse)

res, statusCode := sut.LogsQLQueryRaw(t, query, apptest.QueryOpts{
Start: "2025-01-01T01:00:00Z",
End: "2025-01-01T01:00:03Z",
})
if statusCode != 200 {
t.Fatalf("unexpected response status code for optimized query: %d; response\n%s", statusCode, res)
}
got = apptest.NewLogsQLQueryResponse(t, res)
assertLogsQLResponseEqual(t, got, wantResponse)
}
14 changes: 10 additions & 4 deletions lib/logstorage/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4154,7 +4154,9 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
f("* | delete foo, bar", true)
f("* | drop_empty_fields", true)
f("* | extract '<foo>bar<baz>'", true)
f("* | extract '<_time>bar<baz>'", false)
f("* | extract_regexp 'foo(?P<bar>baz)'", true)
f("* | extract_regexp 'foo(?P<_time>baz)'", false)
f("* | facets", false)
f("* | field_names", false)
f("* | fields foo, bar", false)
Expand Down Expand Up @@ -4201,8 +4203,12 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
f("* | total_stats count()", false)
f("* | union (x)", false)
f("* | uniq (x)", false)
f("* | unpack_json x", true)
f("* | unpack_logfmt x", true)
f("* | unpack_json x", false)
f("* | unpack_json fields (_time)", false)
f("* | unpack_json result_prefix x_", true)
f("* | unpack_logfmt x", false)
f("* | unpack_logfmt fields (_time)", false)
f("* | unpack_logfmt result_prefix x_", true)
f("* | unpack_syslog x", true)
f("* | unpack_words x", true)
f("* | unroll by (x)", true)
Expand Down Expand Up @@ -4386,8 +4392,8 @@ func TestQueryGetStatsLabelsAddGroupingByTime_Success(t *testing.T) {
f("* | replace_regexp ('foo', 'bar') | count() x", nsecsPerDay, 0, []string{"_time"}, `* | replace_regexp (foo, bar) | stats by (_time:86400000000000) count(*) as x`)
f("* | split 'foo' | count() x", nsecsPerDay, 0, []string{"_time"}, `* | split foo | stats by (_time:86400000000000) count(*) as x`)
f("* | time_add 1h | count() x", nsecsPerDay, 0, []string{"_time"}, `* | time_add 1h | stats by (_time:86400000000000) count(*) as x`)
f("* | unpack_json x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_json from x | stats by (_time:86400000000000) count(*) as x`)
f("* | unpack_logfmt x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_logfmt from x | stats by (_time:86400000000000) count(*) as x`)
f("* | unpack_json x result_prefix x_ | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_json from x result_prefix x_ | stats by (_time:86400000000000) count(*) as x`)
f("* | unpack_logfmt x result_prefix x_ | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_logfmt from x result_prefix x_ | stats by (_time:86400000000000) count(*) as x`)
f("* | unpack_syslog x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_syslog from x | stats by (_time:86400000000000) count(*) as x`)
f("* | unpack_words x | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unpack_words from x | stats by (_time:86400000000000) count(*) as x`)
f("* | unroll by (x) | count() x", nsecsPerDay, 0, []string{"_time"}, `* | unroll by (x) | stats by (_time:86400000000000) count(*) as x`)
Expand Down
7 changes: 5 additions & 2 deletions lib/logstorage/pipe_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ func (pe *pipeExtract) canLiveTail() bool {
}

func (pe *pipeExtract) canReturnLastNResults() bool {
// TODO: properly verify that the extracted fields do not overwrite the _time field with non-timestamp values.

for _, f := range pe.ptn.fields {
if f.name == "_time" {
return false
}
}
return true
}

Expand Down
7 changes: 5 additions & 2 deletions lib/logstorage/pipe_extract_regexp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ func (pe *pipeExtractRegexp) canLiveTail() bool {
}

func (pe *pipeExtractRegexp) canReturnLastNResults() bool {
// TODO: properly verify that the extracted fields do not overwrite the _time field with non-timestamp values.

for _, f := range pe.reFields {
if f == "_time" {
return false
}
}
return true
}

Expand Down
4 changes: 1 addition & 3 deletions lib/logstorage/pipe_unpack_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ func (pu *pipeUnpackJSON) canLiveTail() bool {
}

func (pu *pipeUnpackJSON) canReturnLastNResults() bool {
// TODO: verify that the unpacked fields do not overwrite _time with non-timestamp values.

return true
return pu.resultPrefix != "" || !prefixfilter.MatchFilters(pu.fieldFilters, "_time")
}

func (pu *pipeUnpackJSON) isFixedOutputFieldsOrder() bool {
Expand Down
4 changes: 1 addition & 3 deletions lib/logstorage/pipe_unpack_logfmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ func (pu *pipeUnpackLogfmt) canLiveTail() bool {
}

func (pu *pipeUnpackLogfmt) canReturnLastNResults() bool {
// TODO: verify that the unpacked fields do not overwrite _time with non-timestamp values.

return true
return pu.resultPrefix != "" || !prefixfilter.MatchFilters(pu.fieldFilters, "_time")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: resultPrefix != "" is not a sufficient safety guarantee: fieldsUnpackerContext.addField concatenates fieldPrefix + name directly, so a prefix like "_" with a field named "time" produces _time. The canReturnLastNResults guard should account for whether any combination of resultPrefix and a matched field name could resolve to _time.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At lib/logstorage/pipe_unpack_logfmt.go, line 61:

<comment>`resultPrefix != ""` is not a sufficient safety guarantee: `fieldsUnpackerContext.addField` concatenates `fieldPrefix + name` directly, so a prefix like `"_"` with a field named `"time"` produces `_time`. The `canReturnLastNResults` guard should account for whether any combination of `resultPrefix` and a matched field name could resolve to `_time`.</comment>

<file context>
@@ -58,9 +58,7 @@ func (pu *pipeUnpackLogfmt) canLiveTail() bool {
-	// TODO: verify that the unpacked fields do not overwrite _time with non-timestamp values.
-
-	return true
+	return pu.resultPrefix != "" || !prefixfilter.MatchFilters(pu.fieldFilters, "_time")
 }
 
</file context>

}

func (pu *pipeUnpackLogfmt) isFixedOutputFieldsOrder() bool {
Expand Down