Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
53f9e2d
Initial plan
Copilot Feb 16, 2026
53be649
feat: enable ANN rule in ruff (with temporary ignores)
Copilot Feb 16, 2026
862dfae
feat: remove ANN rule ignores from ruff config
Copilot Feb 17, 2026
b1adcee
feat: add type annotations to test_retry.py
Copilot Feb 17, 2026
b81a427
feat: add type annotations to utils/record.py
Copilot Feb 17, 2026
b7d254f
feat: add type annotations to dpdisp.py
Copilot Feb 17, 2026
d73da14
feat: add type annotations to entrypoints/gui.py
Copilot Feb 17, 2026
303a727
feat: add type annotations to doc/conf.py
Copilot Feb 17, 2026
78a4f8a
feat: add type annotations to tests/context.py
Copilot Feb 17, 2026
0c6faf7
feat: add type annotations to tests/test_class_submission.py
Copilot Feb 17, 2026
67ec629
feat: add type annotations to tests/test_local_context.py
Copilot Feb 17, 2026
0a11056
feat: add type annotations to tests/test_rsync_flags.py
Copilot Feb 17, 2026
64bfea5
feat: add type annotations to tests/test_run_submission_bohrium.py
Copilot Feb 17, 2026
d7add70
feat: add type annotations to tests/sample_class.py
Copilot Feb 17, 2026
84ebd50
feat: add type annotations to tests/test_run_submission.py
Copilot Feb 17, 2026
34eb030
feat: add type annotations to utils/dpcloudserver/zip_file.py
Copilot Feb 17, 2026
fc5101b
feat: add type annotations to utils/hdfs_cli.py
Copilot Feb 17, 2026
56695ce
feat: add type annotations to utils/utils.py
Copilot Feb 17, 2026
55e0e72
feat: add type annotations to utils/dpcloudserver/client.py
Copilot Feb 17, 2026
afa9330
feat: add type annotations to machines/fugaku.py
Copilot Feb 17, 2026
52b4965
feat: add type annotations to machines/JH_UniScheduler.py
Copilot Feb 17, 2026
a8f84f8
feat: add type annotations to machines/shell.py
Copilot Feb 17, 2026
a30b9ff
feat: add type annotations to machines/distributed_shell.py
Copilot Feb 17, 2026
f234453
feat: add type annotations to machines/lsf.py
Copilot Feb 17, 2026
96acd97
feat: add type annotations to machines/slurm.py
Copilot Feb 17, 2026
0a812e8
feat: add type annotations to machines/pbs.py
Copilot Feb 17, 2026
bfb0188
feat: add type annotations to machines/openapi.py
Copilot Feb 17, 2026
23aae05
feat: add type annotations to machines/dp_cloud_server.py
Copilot Feb 17, 2026
b62c3a9
feat: add type annotations to contexts/hdfs_context.py
Copilot Feb 17, 2026
c1304e8
feat: add type annotations to contexts/lazy_local_context.py
Copilot Feb 17, 2026
f669cdd
feat: add type annotations to contexts/local_context.py
Copilot Feb 17, 2026
37b3cb4
feat: add type annotations to contexts/dp_cloud_server_context.py
Copilot Feb 17, 2026
812e052
feat: add type annotations to contexts/openapi_context.py
Copilot Feb 17, 2026
ac45dab
feat: add type annotations to contexts/ssh_context.py
Copilot Feb 17, 2026
c349d57
feat: add type annotations to base_context.py
Copilot Feb 17, 2026
def6dea
feat: add type annotations to machine.py
Copilot Feb 17, 2026
abfc463
feat: add type annotations to submission.py
Copilot Feb 17, 2026
9179f89
feat: add type annotations to remaining files (auto-fixed by ruff)
Copilot Feb 17, 2026
8c43572
refactor: replace Any with proper types in base_context.py
Copilot Feb 18, 2026
631d7eb
refactor: replace Any with proper types in machine.py
Copilot Feb 18, 2026
c13e0da
refactor: replace Any with proper types in submission.py
Copilot Feb 18, 2026
8ac08ec
refactor: replace Any with proper types in contexts/lazy_local_contex…
Copilot Feb 18, 2026
3676dbd
refactor: replace Any with proper types in contexts/local_context.py
Copilot Feb 18, 2026
fc69ed2
refactor: fix download method type in contexts/local_context.py
Copilot Feb 18, 2026
34ec5d5
refactor: replace Any with proper types in contexts/hdfs_context.py
Copilot Feb 18, 2026
1054c57
refactor: replace Any with proper types in contexts/ssh_context.py
Copilot Feb 18, 2026
35122fa
refactor: replace Any with proper types in contexts/dp_cloud_server_c…
Copilot Feb 18, 2026
4bda783
refactor: fix Job reference in openapi_context.py
Copilot Feb 18, 2026
15342d5
refactor: replace Any with proper types in machines/lsf.py
Copilot Feb 18, 2026
83d21a5
refactor: replace Any with proper types in machines/dp_cloud_server.py
Copilot Feb 18, 2026
6e8a844
refactor: replace Any with proper types in machines/openapi.py
Copilot Feb 18, 2026
2604cb3
fix: replace Python 3.10+ union syntax with Union for compatibility
Copilot Feb 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import sys
from datetime import date
from typing import Any

# sys.path.insert(0, os.path.abspath('.'))

Expand Down Expand Up @@ -68,7 +69,7 @@
master_doc = "index"


def run_apidoc(_):
def run_apidoc(_: Any) -> None: # noqa: ANN401
from sphinx.ext.apidoc import main

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
Expand All @@ -89,7 +90,7 @@ def run_apidoc(_):
)


def setup(app):
def setup(app: Any) -> None: # noqa: ANN401
app.connect("builder-inited", run_apidoc)


Expand Down
37 changes: 23 additions & 14 deletions dpdispatcher/base_context.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from abc import ABCMeta, abstractmethod
from typing import Any, List, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Tuple

from dargs import Argument

from dpdispatcher.dlog import dlog

if TYPE_CHECKING:
from dpdispatcher.submission import Submission


class BaseContext(metaclass=ABCMeta):
subclasses_dict = {}
Expand All @@ -13,15 +16,15 @@ class BaseContext(metaclass=ABCMeta):
# notes: this attribute can be inherited
alias: Tuple[str, ...] = tuple()

def __new__(cls, *args, **kwargs):
def __new__(cls, *args: Any, **kwargs: Any) -> "BaseContext": # noqa: ANN401
if cls is BaseContext:
subcls = cls.subclasses_dict[kwargs["context_type"]]
instance = subcls.__new__(subcls, *args, **kwargs)
else:
instance = object.__new__(cls)
return instance

def __init_subclass__(cls, **kwargs):
def __init_subclass__(cls, **kwargs: Any) -> None: # noqa: ANN401
super().__init_subclass__(**kwargs)
alias = [cls.__name__, *cls.alias]
for aa in alias:
Expand All @@ -32,7 +35,7 @@ def __init_subclass__(cls, **kwargs):
cls.options.add(cls.__name__)

@classmethod
def load_from_dict(cls, context_dict):
def load_from_dict(cls, context_dict: Dict[str, Any]) -> "BaseContext": # noqa: ANN401
context_type = context_dict["context_type"]
# print("debug778:context_type", cls.subclasses_dict, context_type)
try:
Expand All @@ -45,35 +48,41 @@ def load_from_dict(cls, context_dict):
context = context_class.load_from_dict(context_dict)
return context

def bind_submission(self, submission):
def bind_submission(self, submission: "Submission") -> None:
self.submission = submission

@abstractmethod
def upload(self, submission):
def upload(self, submission: "Submission") -> None:
raise NotImplementedError("abstract method")

@abstractmethod
def download(
self, submission, check_exists=False, mark_failure=True, back_error=False
):
self,
submission: "Submission",
check_exists: bool = False,
mark_failure: bool = True,
back_error: bool = False,
) -> None:
raise NotImplementedError("abstract method")

@abstractmethod
def clean(self):
def clean(self) -> None:
raise NotImplementedError("abstract method")

@abstractmethod
def write_file(self, fname, write_str):
def write_file(self, fname: str, write_str: str) -> None:
raise NotImplementedError("abstract method")

@abstractmethod
def read_file(self, fname):
def read_file(self, fname: str) -> str:
raise NotImplementedError("abstract method")

def check_finish(self, proc):
def check_finish(self, proc: Any) -> Any: # noqa: ANN401
raise NotImplementedError("abstract method")

def block_checkcall(self, cmd, asynchronously=False) -> Tuple[Any, Any, Any]:
def block_checkcall(
self, cmd: str, asynchronously: bool = False
) -> Tuple[Any, Any, Any]: # noqa: ANN401
"""Run command with arguments. Wait for command to complete.

Parameters
Expand Down Expand Up @@ -112,7 +121,7 @@ def block_checkcall(self, cmd, asynchronously=False) -> Tuple[Any, Any, Any]:
return stdin, stdout, stderr

@abstractmethod
def block_call(self, cmd) -> Tuple[int, Any, Any, Any]:
def block_call(self, cmd: str) -> Tuple[int, Any, Any, Any]: # noqa: ANN401
"""Run command with arguments. Wait for command to complete.

Parameters
Expand Down
61 changes: 35 additions & 26 deletions dpdispatcher/contexts/dp_cloud_server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import shutil
import uuid
from typing import List
from typing import TYPE_CHECKING, Any, List, NoReturn, Optional

import tqdm
from dargs.dargs import Argument
Expand All @@ -19,6 +19,9 @@
ALI_STS_ENDPOINT,
)

if TYPE_CHECKING:
from dpdispatcher.submission import Job, Submission

# from zip_file import zip_files

DP_CLOUD_SERVER_HOME_DIR = os.path.join(
Expand All @@ -31,12 +34,12 @@ class BohriumContext(BaseContext):

def __init__(
self,
local_root,
remote_root=None,
remote_profile={},
*args,
**kwargs,
):
local_root: str,
remote_root: Optional[str] = None,
remote_profile: dict[str, Any] = {}, # noqa: ANN401
*args: Any, # noqa: ANN401
**kwargs: Any, # noqa: ANN401
) -> None:
self.init_local_root = local_root
self.init_remote_root = remote_root
self.temp_local_root = os.path.abspath(local_root)
Expand Down Expand Up @@ -67,7 +70,7 @@ def __init__(
self.api = Client(account, password)

@classmethod
def load_from_dict(cls, context_dict):
def load_from_dict(cls, context_dict: dict[str, Any]) -> "BohriumContext": # noqa: ANN401
local_root = context_dict["local_root"]
remote_root = context_dict.get("remote_root", None)
remote_profile = context_dict.get("remote_profile", {})
Expand All @@ -79,7 +82,7 @@ def load_from_dict(cls, context_dict):
)
return dp_cloud_server_context

def bind_submission(self, submission):
def bind_submission(self, submission: "Submission") -> None:
self.submission = submission
self.local_root = os.path.join(self.temp_local_root, submission.work_base)
self.remote_root = "."
Expand All @@ -92,7 +95,7 @@ def bind_submission(self, submission):
# file_uuid = uuid.uuid1().hex
# oss_task_dir = os.path.join()

def _gen_oss_path(self, job, zip_filename):
def _gen_oss_path(self, job: "Job", zip_filename: str) -> str:
if hasattr(job, "upload_path") and job.upload_path:
return job.upload_path
else:
Expand All @@ -105,7 +108,7 @@ def _gen_oss_path(self, job, zip_filename):
setattr(job, "upload_path", path)
return path

def upload_job(self, job, common_files=None):
def upload_job(self, job: "Job", common_files: Optional[list[str]] = None) -> None:
MAX_RETRY = 3
if common_files is None:
common_files = []
Expand Down Expand Up @@ -133,7 +136,7 @@ def upload_job(self, job, common_files=None):
retry_count = 0
self._backup(self.local_root, upload_zip)

def upload(self, submission):
def upload(self, submission: "Submission") -> None:
# oss_task_dir = os.path.join('%s/%s/%s.zip' % ('indicate', file_uuid, file_uuid))
# zip_filename = submission.submission_hash + '.zip'
# oss_task_zip = 'indicate/' + submission.submission_hash + '/' + zip_filename
Expand Down Expand Up @@ -162,8 +165,12 @@ def upload(self, submission):
# api.upload(self.oss_task_dir, zip_task_file)

def download(
self, submission, check_exists=False, mark_failure=True, back_error=False
):
self,
submission: "Submission",
check_exists: bool = False,
mark_failure: bool = True,
back_error: bool = False,
) -> bool:
jobs = submission.belonging_jobs
job_hashs = {}
job_infos = {}
Expand Down Expand Up @@ -210,7 +217,9 @@ def download(
)
return True

def _check_if_job_has_already_downloaded(self, target, local_root):
def _check_if_job_has_already_downloaded(
self, target: str, local_root: str
) -> bool:
backup_file_location = os.path.join(
local_root, "backup", os.path.split(target)[1]
)
Expand All @@ -219,7 +228,7 @@ def _check_if_job_has_already_downloaded(self, target, local_root):
else:
return False

def _backup(self, local_root, target):
def _backup(self, local_root: str, target: str) -> None:
try:
# move to backup directory
os.makedirs(os.path.join(local_root, "backup"), exist_ok=True)
Expand All @@ -229,45 +238,45 @@ def _backup(self, local_root, target):
except (OSError, shutil.Error) as e:
dlog.exception("unable to backup file, " + str(e))

def _clean_backup(self, local_root, keep_backup=True):
def _clean_backup(self, local_root: str, keep_backup: bool = True) -> None:
if not keep_backup:
dir_to_be_removed = os.path.join(local_root, "backup")
if os.path.exists(dir_to_be_removed):
shutil.rmtree(dir_to_be_removed)

def write_file(self, fname, write_str):
def write_file(self, fname: str, write_str: str) -> bool:
result = self.write_home_file(fname, write_str)
return result

def write_local_file(self, fname, write_str):
def write_local_file(self, fname: str, write_str: str) -> str:
local_filename = os.path.join(self.local_root, fname)
with open(local_filename, "w") as f:
f.write(write_str)
return local_filename

def read_file(self, fname):
def read_file(self, fname: str) -> str:
result = self.read_home_file(fname)
return result

def write_home_file(self, fname, write_str):
def write_home_file(self, fname: str, write_str: str) -> bool:
# os.makedirs(self.remote_root, exist_ok = True)
with open(os.path.join(DP_CLOUD_SERVER_HOME_DIR, fname), "w") as fp:
fp.write(write_str)
return True

def read_home_file(self, fname):
def read_home_file(self, fname: str) -> str:
with open(os.path.join(DP_CLOUD_SERVER_HOME_DIR, fname)) as fp:
ret = fp.read()
return ret

def check_file_exists(self, fname):
def check_file_exists(self, fname: str) -> bool:
result = self.check_home_file_exits(fname)
return result

def check_home_file_exits(self, fname):
def check_home_file_exits(self, fname: str) -> bool:
return os.path.isfile(os.path.join(DP_CLOUD_SERVER_HOME_DIR, fname))

def clean(self):
def clean(self) -> bool:
submission_file_name = f"{self.submission.submission_hash}.json"
submission_json = os.path.join(DP_CLOUD_SERVER_HOME_DIR, submission_file_name)
os.remove(submission_json)
Expand Down Expand Up @@ -337,7 +346,7 @@ def machine_subfields(cls) -> List[Argument]:
)
]

def block_call(self, cmd):
def block_call(self, cmd: str) -> NoReturn:
raise RuntimeError(
"Unsupported method. You may use an unsupported combination of the machine and the context."
)
Expand Down
Loading
Loading