Skip to content
Open
41 changes: 37 additions & 4 deletions invokeai/app/api/routers/custom_nodes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""FastAPI routes for custom node management."""

import json
import re
import shutil
import subprocess
import sys
Expand Down Expand Up @@ -28,6 +29,7 @@
# were imported by that pack. Used on uninstall to delete only pack-imported workflows
# — deleting by tag alone is unsafe because users can edit tags on their own workflows.
PACK_MANIFEST_FILENAME = ".invokeai_pack_manifest.json"
PACK_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")


class NodePackInfo(BaseModel):
Expand Down Expand Up @@ -84,6 +86,27 @@ def _get_custom_nodes_path() -> Path:
return config.custom_nodes_path


def _validate_pack_name(pack_name: str) -> str:
"""Validate pack directory names before using them in filesystem paths."""
if (
not pack_name
or pack_name in {".", ".."}
or "/" in pack_name
or "\\" in pack_name
or not PACK_NAME_RE.fullmatch(pack_name)
):
raise ValueError("Invalid node pack name.")
return pack_name


def _extract_pack_name_from_source(source: str) -> str:
"""Derive a safe node pack name from a git source string."""
pack_name = source.rstrip("/").split("/")[-1]
if pack_name.endswith(".git"):
pack_name = pack_name[:-4]
return _validate_pack_name(pack_name)


def _get_installed_packs() -> list[NodePackInfo]:
"""Scans the custom nodes directory and returns info about installed packs."""
custom_nodes_path = _get_custom_nodes_path()
Expand Down Expand Up @@ -156,10 +179,15 @@ async def install_custom_node_pack(

source = request.source.strip()

# Extract pack name from URL
pack_name = source.rstrip("/").split("/")[-1]
if pack_name.endswith(".git"):
pack_name = pack_name[:-4]
# Extract and validate pack name before using it in filesystem paths.
try:
pack_name = _extract_pack_name_from_source(source)
except ValueError as exc:
return InstallNodePackResponse(
name="",
success=False,
message=str(exc),
)

target_dir = custom_nodes_path / pack_name

Expand Down Expand Up @@ -266,6 +294,11 @@ async def uninstall_custom_node_pack(
Note: A restart is required for the node removal to take full effect.
Installed nodes from the pack will remain registered until restart.
"""
try:
pack_name = _validate_pack_name(pack_name)
except ValueError as exc:
return UninstallNodePackResponse(name=pack_name, success=False, message=str(exc))

custom_nodes_path = _get_custom_nodes_path()
target_dir = custom_nodes_path / pack_name

Expand Down
65 changes: 65 additions & 0 deletions tests/app/routers/test_custom_nodes.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
"""Tests for the custom nodes router."""

import asyncio
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch

from invokeai.app.api.routers.custom_nodes import (
PACK_MANIFEST_FILENAME,
_extract_pack_name_from_source,
_get_installed_packs,
_import_workflows_from_pack,
_load_node_pack,
_purge_pack_modules,
_read_pack_manifest,
_remove_workflows_by_ids,
_validate_pack_name,
_write_pack_manifest,
uninstall_custom_node_pack,
)
from invokeai.app.invocations.baseinvocation import (
BaseInvocation,
Expand Down Expand Up @@ -77,6 +81,67 @@ def test_finds_multiple_packs_sorted(self, tmp_path: Path) -> None:
assert [p.name for p in packs] == ["alpha_pack", "middle_pack", "zebra_pack"]


class TestPackNameValidation:
"""Tests for safely deriving node pack directory names from install sources."""

def test_validate_pack_name_accepts_safe_names(self) -> None:
assert _validate_pack_name("good_pack") == "good_pack"
assert _validate_pack_name("pack-1.2") == "pack-1.2"

def test_validate_pack_name_rejects_path_segments(self) -> None:
for pack_name in ("", ".", "..", "foo/bar", "..\\outside", "foo\nbar"):
try:
_validate_pack_name(pack_name)
except ValueError:
continue
raise AssertionError(f"Expected {pack_name!r} to be rejected")

def test_extracts_pack_name_from_git_url(self) -> None:
assert _extract_pack_name_from_source("https://github.com/example/my-pack.git") == "my-pack"

def test_rejects_empty_pack_name(self) -> None:
try:
_extract_pack_name_from_source("")
except ValueError:
return
raise AssertionError("Expected empty pack name to be rejected")

def test_rejects_dot_segments(self) -> None:
for source in ("https://github.com/example/..", "https://github.com/example/.git"):
try:
_extract_pack_name_from_source(source)
except ValueError:
continue
raise AssertionError(f"Expected {source!r} to be rejected")

def test_rejects_windows_path_separator_traversal(self) -> None:
try:
_extract_pack_name_from_source("https://github.com/example/..\\outside.git")
except ValueError:
return
raise AssertionError("Expected Windows path separator traversal to be rejected")


class TestUninstallPackNameValidation:
"""Tests that uninstall validates direct path parameters before deleting files."""

def test_rejects_invalid_pack_names_before_filesystem_side_effects(self) -> None:
for pack_name in ("..", ".", "foo/bar", "..\\outside"):
with (
patch("invokeai.app.api.routers.custom_nodes._get_custom_nodes_path") as mock_get_path,
patch("invokeai.app.api.routers.custom_nodes.shutil.rmtree") as mock_rmtree,
patch("invokeai.app.api.routers.custom_nodes._remove_workflows_by_ids") as mock_remove_workflows,
):
response = asyncio.run(uninstall_custom_node_pack(MagicMock(), pack_name))

assert response.name == pack_name
assert response.success is False
assert response.message == "Invalid node pack name."
mock_get_path.assert_not_called()
mock_rmtree.assert_not_called()
mock_remove_workflows.assert_not_called()


class TestImportWorkflowsFromPack:
"""Tests for _import_workflows_from_pack()."""

Expand Down
Loading