Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 115 additions & 8 deletions apisix/plugins/ai-lakera-guard.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local client = require("apisix.plugins.ai-lakera-guard.client")
local protocols = require("apisix.plugins.ai-protocols")
local binding = require("apisix.plugins.ai-protocols.binding")

local ngx = ngx
local ipairs = ipairs
local type = type
local concat = table.concat
Expand Down Expand Up @@ -114,36 +115,46 @@ local function normalize_messages(messages)
end


local function request_content_moderation(ctx, conf, messages)
-- Scan a conversation with Lakera and decide what to do. Shared by the request
-- (input) and response (output) paths; `label` ("request"/"response") tailors the
-- logs and `failure_message` selects the direction-specific deny text. Returns
-- (deny_code, deny_body) when the traffic must be blocked, or nothing to allow.
local function moderate(ctx, conf, messages, label, failure_message)
if not messages or #messages == 0 then
return
end

local result, err = client.scan(conf, messages)
if err then
if conf.fail_open then
core.log.warn("ai-lakera-guard: ", err, "; fail_open=true, allowing request")
core.log.warn("ai-lakera-guard: ", err, "; fail_open=true, allowing ", label)
return
end
core.log.error("ai-lakera-guard: ", err, "; fail_open=false, blocking request")
return conf.deny_code, deny_message(ctx, conf, conf.request_failure_message)
core.log.error("ai-lakera-guard: ", err, "; fail_open=false, blocking ", label)
return conf.deny_code, deny_message(ctx, conf, failure_message)
end

if not result.flagged then
return
end

-- Log Lakera's full per-detector verdict (every entry, detected or not) so
-- both alert mode and blocked requests are auditable.
core.log.warn("ai-lakera-guard: request flagged by Lakera Guard",
-- both alert mode and blocked traffic are auditable.
core.log.warn("ai-lakera-guard: ", label, " flagged by Lakera Guard",
", breakdown: ", core.json.encode(result.breakdown),
", request_uuid: ", result.request_uuid or "")

if conf.action == "alert" then
return
end

return conf.deny_code, deny_message(ctx, conf, conf.request_failure_message, result.breakdown)
return conf.deny_code, deny_message(ctx, conf, failure_message, result.breakdown)
end


local function moderate_response(ctx, conf, text)
return moderate(ctx, conf, { { role = "assistant", content = text } },
"response", conf.response_failure_message)
end


Expand All @@ -160,6 +171,10 @@ function _M.access(conf, ctx)
return
end

if conf.direction == "output" then
return
end

local request_tab, err = core.request.get_json_request_body_table()
if not request_tab then
local handled, code, body = binding.on_unsupported(
Expand Down Expand Up @@ -194,7 +209,7 @@ function _M.access(conf, ctx)
end
end

local code, message = request_content_moderation(ctx, conf, messages)
local code, message = moderate(ctx, conf, messages, "request", conf.request_failure_message)
if code then
if ctx.var.request_type == "ai_stream" then
core.response.set_header("Content-Type", "text/event-stream")
Expand All @@ -206,4 +221,96 @@ function _M.access(conf, ctx)
end


function _M.lua_body_filter(conf, ctx, headers, body)
if conf.direction ~= "output" and conf.direction ~= "both" then
return
end

if ngx.status >= 400 then
return
end

-- Non-streaming: ai-proxy hands us the fully-assembled completion text.
if ctx.var.request_type == "ai_chat" then
local text = ctx.var.llm_response_text
if not text or text == "" then
return
end
return moderate_response(ctx, conf, text)
end

if ctx.var.request_type == "ai_stream" then
Comment thread
nic-6443 marked this conversation as resolved.
Comment thread
nic-6443 marked this conversation as resolved.
-- alert (shadow) mode non-blocking
if conf.action == "alert" then
if ctx.var.llm_request_done and not ctx.lakera_response_decided then
ctx.lakera_response_decided = "clean"
local text = ctx.var.llm_response_text
if text and text ~= "" then
moderate_response(ctx, conf, text)
else
core.log.info("ai-lakera-guard: alert mode could not scan the ",
"streamed response (no assembled completion)")
end
end
return
end

-- block mode
local buffer = ctx.lakera_response_buffer
if not buffer then
buffer = {}
ctx.lakera_response_buffer = buffer
end

if ctx.lakera_response_decided then
if ctx.lakera_response_decided == "blocked" then
return nil, ":\n\n"
end
return
end

buffer[#buffer + 1] = body or ""

if not ctx.var.llm_request_done then
Comment thread
nic-6443 marked this conversation as resolved.
-- Withhold this chunk until end-of-stream, replacing it with an SSE
-- keep-alive comment. Not "" (nginx treats an empty body as nothing
-- to flush) and not nil (which would let the original chunk reach
-- the client) -- the keep-alive holds the content back while keeping
-- the connection open.
return nil, ":\n\n"
end

local text = ctx.var.llm_response_text
if text == "" then
ctx.lakera_response_decided = "clean"
return nil, concat(buffer)
end
if not text then
if conf.fail_open then
core.log.warn("ai-lakera-guard: streamed response ended without ",
"an assembled completion (no upstream usage event?); ",
"fail_open=true, releasing unscanned")
ctx.lakera_response_decided = "clean"
return nil, concat(buffer)
end
core.log.error("ai-lakera-guard: streamed response ended without ",
"an assembled completion (no upstream usage event?); ",
"fail_open=false, blocking response")
ctx.lakera_response_decided = "blocked"
return ngx.OK, deny_message(ctx, conf, conf.response_failure_message)
end

local code, message = moderate_response(ctx, conf, text)
if code then
ctx.lakera_response_decided = "blocked"
return ngx.OK, message
end

-- Clean: release the buffered stream verbatim, preserving SSE framing.
ctx.lakera_response_decided = "clean"
return nil, concat(buffer)
Comment thread
AlinsRan marked this conversation as resolved.
end
end


return _M
10 changes: 7 additions & 3 deletions apisix/plugins/ai-lakera-guard/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ local schema = {
},
direction = {
type = "string",
-- input only in this phase; output/both are added in later phases.
enum = { "input" },
enum = { "input", "output", "both" },
default = "input",
description = "Which traffic to scan.",
description = "Which traffic to scan: input (request), output (response), or both.",
},
action = {
type = "string",
Expand Down Expand Up @@ -90,6 +89,11 @@ local schema = {
default = "Request blocked by Lakera Guard",
description = "Message returned when a request is blocked.",
},
response_failure_message = {
type = "string",
default = "Response blocked by Lakera Guard",
description = "Message returned when an LLM response is blocked.",
},
},
encrypt_fields = { "api_key" },
required = { "api_key" },
Expand Down
32 changes: 27 additions & 5 deletions apisix/plugins/ai-providers/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,10 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
ngx.thread.kill(flush_thread)
flush_thread = nil
end
if output_sent and not ctx.var.llm_request_done then
ctx.var.llm_request_done = true
plugin.lua_response_filter(ctx, res.headers, "", nil, true)
end
if not flush_err then
ngx.flush(true)
end
Expand Down Expand Up @@ -687,6 +691,16 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
end
output_sent = true
end

if ctx.var.llm_request_done and #converted_chunks == 0
and output_sent then
local ok, flush_err = plugin.lua_response_filter(
ctx, res.headers, "", no_flush, true)
if not ok then
abort_on_disconnect(flush_err)
return
end
end
else
local ok, flush_err = plugin.lua_response_filter(
ctx, res.headers, chunk, no_flush, true)
Expand Down Expand Up @@ -731,11 +745,19 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
ctx.var.llm_request_done = true
res._upstream_bytes = bytes_read
if output_sent then
-- Client has already received partial SSE; stop feeding chunks.
-- nginx will close the downstream connection at end of content
-- phase. Clients detect incomplete responses via the absence
-- of a protocol-specific terminator (e.g. OpenAI [DONE],
-- Anthropic message_stop, Responses response.completed).
-- Client has already received partial SSE. Dispatch one final
-- body_filter pass now that llm_request_done is set, so plugins
-- that buffer the whole stream to enforce a block (e.g.
-- ai-lakera-guard) can flush or replace their buffered content
-- instead of stranding it -- otherwise the client is left with
-- only the keep-alive heartbeats and never receives the body.
-- Mirrors the normal end-of-stream path, where llm_request_done
-- is set before the last chunk is filtered. nginx then closes
-- the downstream connection at end of content phase; clients
-- detect the incomplete response via the absence of a
-- protocol-specific terminator (e.g. OpenAI [DONE], Anthropic
-- message_stop, Responses response.completed).
plugin.lua_response_filter(ctx, res.headers, "", nil, true)
return
end
-- No bytes flushed yet (e.g. converter skipped all events so far).
Expand Down
47 changes: 41 additions & 6 deletions docs/en/latest/plugins/ai-lakera-guard.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ The `ai-lakera-guard` Plugin should be used with either the [`ai-proxy`](./ai-pr

Requests that did not pass through `ai-proxy`/`ai-proxy-multi` (for example plain HTTP traffic when the Plugin is bound at the Consumer or Service level) cannot be inspected. By default such requests are passed through unchecked; this is configurable via `fail_mode`.

:::note

This release scans **requests** only (`direction: input`). Response and streaming scanning are added in later releases.

:::
The Plugin can scan the request prompt (`direction: input`), the LLM response (`direction: output`), or both (`direction: both`), for non-streaming and streaming (SSE) traffic alike. See [Scanning direction](#scanning-direction) for the behavior of each, including how streamed responses are buffered before they reach the client.

## Attributes

Expand All @@ -60,7 +56,7 @@ This release scans **requests** only (`direction: input`). Response and streamin
| api_key | string | True | | | Lakera Guard API key, sent as `Authorization: Bearer`. The value is encrypted with AES before being stored in etcd, and supports [secret references](../terminology/secret.md) (`$secret://`) and environment variables (`$env://`). |
| lakera_endpoint | string | False | `https://api.lakera.ai/v2/guard` | | Lakera Guard v2 endpoint. Override for regional or self-hosted instances. |
| project_id | string | False | | | Lakera project whose policy (detectors and thresholds) to apply. If unset, the account default policy is used. |
| direction | string | False | `input` | `input` | Which traffic to scan. Only `input` (request) is supported in this release. |
| direction | string | False | `input` | `input`, `output`, `both` | Which traffic to scan. `input` scans the request prompt; `output` scans the LLM response; `both` scans the request and then, only if the request passed, the response. See [Scanning direction](#scanning-direction). |
| action | string | False | `block` | `block`, `alert` | How a flagged verdict is handled. `block` denies the request; `alert` is a log-only shadow mode that passes flagged requests through. This only governs flagged verdicts — Lakera API errors/timeouts are still controlled by `fail_open` even in `alert` mode. |
| fail_open | boolean | False | `false` | | Behavior when Lakera cannot be reached (timeout, connection error, non-2xx, decode failure). `false` (fail-closed) blocks the request; `true` (fail-open) allows it. A successful `flagged: false` always passes. |
| fail_mode | string | False | `"skip"` | `skip`, `warn`, `error` | Behavior when the request is not a recognized AI request that this Plugin can inspect (for example, plain HTTP traffic on a Consumer-bound Plugin, or a request that did not pass through `ai-proxy`). `skip`: let the request pass through unchecked; `warn`: pass through and log a warning; `error`: reject the request. Distinct from `fail_open`, which governs Lakera API failures. |
Expand All @@ -69,6 +65,29 @@ This release scans **requests** only (`direction: input`). Response and streamin
| reveal_failure_categories | boolean | False | `false` | | If `true`, append the matched Lakera `detector_type`s (with their confidence result) to the deny message returned to the client. The full per-detector `breakdown` is always requested from Lakera and written to the gateway logs regardless of this setting; this flag only controls client-facing exposure. |
| deny_code | integer | False | `200` | 200 - 599 | HTTP status code returned when a request is blocked. Defaults to `200` so the body — a provider-compatible chat completion (or SSE) carrying `request_failure_message` — parses as a normal refusal in client SDKs (matching how Lakera Guard itself returns `200` with a verdict). Set a 4xx (e.g. `403`) if you prefer blocks to surface as HTTP errors. |
| request_failure_message | string | False | `Request blocked by Lakera Guard` | | Refusal text returned (as the assistant message of a provider-compatible response) when a request is blocked. |
| response_failure_message | string | False | `Response blocked by Lakera Guard` | | Refusal text returned (as the assistant message of a provider-compatible response) when an LLM response is blocked (`direction` `output` or `both`). |

## Scanning direction

The `direction` attribute controls which traffic Lakera scans:

- **`input`** (default): the request prompt is scanned before it reaches the LLM. A flagged request is never forwarded; the deny carries `request_failure_message`.
- **`output`**: the request is forwarded unscanned, and the LLM response is scanned before it reaches the client. A flagged response is replaced with a deny carrying `response_failure_message`.
- **`both`**: the request is scanned first; if it passes, the response is scanned too. A flagged request is blocked before the LLM is called (carrying `request_failure_message`), saving an upstream call; otherwise a flagged response is blocked afterwards (carrying `response_failure_message`).

Response scanning (`output`/`both`) requires `ai-proxy`/`ai-proxy-multi`, which assembles the completion text the Plugin sends to Lakera.

### Streaming responses

When the response is streamed (`stream: true`) in `block` mode, the Plugin **buffers the full SSE response, scans the assembled completion once, and only then releases it** to the client. This is required to enforce a block: partial flagged tokens must never reach the client. A clean response is forwarded with its original SSE framing intact; a flagged response is replaced with a provider-compatible deny SSE terminated by `data: [DONE]`. In `alert` mode the Plugin does **not** buffer — chunks flow through live, token by token, and the assembled completion is scanned only to log the verdict (see [Roll Out in Shadow Mode First](#roll-out-in-shadow-mode-first)).

:::note

In `block` mode the Plugin holds the whole streamed response until scanning finishes, then releases it. The client receives it in one piece after the check rather than token by token. A blocked stream is always returned as the deny message in the response body — once a stream has started, the `deny_code` status can no longer be applied.

Some LLM providers stream responses in a way the Plugin cannot reassemble for scanning. When a response cannot be scanned, the Plugin cannot confirm it is safe, so it follows `fail_open`: by default (fail-closed) the response is blocked; with `fail_open: true` it is passed through unscanned and a warning is logged. The same applies when the gateway aborts a stream via `ai-proxy`'s `max_stream_duration_ms` or `max_response_bytes` safeguards, or when the upstream ends the stream without a terminal event: the buffered content has no assembled completion to scan and is handled per `fail_open` above. Only a client disconnect leaves the held content undelivered. A response the Plugin *can* reassemble but that contains no assistant text — for example a tool-call-only turn — has nothing to scan and is released unscanned, matching the non-streaming path (tool-call arguments themselves are not sent to Lakera).

:::

## Examples

Expand Down Expand Up @@ -334,6 +353,22 @@ curl -i "http://127.0.0.1:9080/anything" -X POST \

You should receive an `HTTP/1.1 200 OK` response with the model output, since Lakera did not flag the request.

### Scan Responses as Well as Requests

To also scan what the LLM returns such as catching leaked PII, policy violations, or injection payloads echoed back in the completion, set `direction` to `both` (or `output` to scan only the response). A flagged response is replaced with a provider-compatible deny carrying `response_failure_message`; streamed responses are buffered, scanned, and then released (see [Scanning direction](#scanning-direction)).

```shell
curl "http://127.0.0.1:9180/apisix/admin/routes/ai-lakera-guard-route" -X PATCH \
-H "X-API-KEY: ${admin_key}" \
-d '{
"plugins": {
"ai-lakera-guard": {
"direction": "both"
}
}
}'
```

### Roll Out in Shadow Mode First

Before enforcing, you can run the Plugin in non-enforcing shadow mode by setting `action` to `alert`. Flagged requests are logged (with the full Lakera `breakdown` and `request_uuid`) but are passed through to the LLM, letting you observe and tune the Lakera policy before turning enforcement on. Note that `alert` only changes how *flagged verdicts* are handled; if Lakera itself cannot be reached, the request is still governed by `fail_open` (fail-closed by default), so set `fail_open` to `true` if shadow-mode traffic must never be blocked.
Expand Down
Loading
Loading