Source code for sparc.client.services.o2sparc

import logging
import os
from configparser import SectionProxy
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, TypeAlias
from zipfile import ZipFile, is_zipfile

import osparc
from osparc.models.profile import Profile

from ._default import ServiceBase

ConfigDict: TypeAlias = dict[str, Any] | SectionProxy
UserNameStr: TypeAlias = str
JobId: TypeAlias = str


[docs] class O2SparcSolver: """ Wrapper for osparc.Solver """ def __init__(self, api_client: osparc.ApiClient, solver_key: str, solver_version: str): self._files_api: osparc.FilesApi = osparc.FilesApi(api_client) self._solvers_api: osparc.SolversApi = osparc.SolversApi(api_client) self._solver: osparc.Solver = self._solvers_api.get_solver_release( solver_key, solver_version ) self._jobs: list[osparc.Job] = []
[docs] def submit_job(self, job_inputs: dict[str, str | int | float | Path]) -> JobId: """ Submit a job to the solver/computational service. Parameters: ----------- job_inputs: Dict[str, str | int | float | pathlib.Path] When passing a file to the solver, pass it as a pathlib.Path object. Returns: -------- A string representing the job id. """ inputs: dict[str, str | int | float | osparc.File] = {} for key in job_inputs: inp = job_inputs[key] if isinstance(inp, Path): if not inp.is_file(): raise RuntimeError(f"Input {key} is not a file.") inputs[key] = self._files_api.upload_file(inp) else: inputs[key] = inp job: osparc.Job = self._solvers_api.create_job( self._solver.id, self._solver.version, osparc.JobInputs(inputs) ) self._jobs.append(job) self._solvers_api.start_job(self._solver.id, self._solver.version, job.id) return job.id
[docs] def get_job_progress(self, job_id: JobId) -> float: """ Get the job progress Parameters: ----------- job_id: str The job id Returns: -------- A float between 0.0 and 1.0 indicating the progress of the job. 1.0 means the job is done. """ status: osparc.JobStatus = self._solvers_api.inspect_job( self._solver.id, self._solver.version, job_id ) return float(status.progress / 100)
[docs] def job_done(self, job_id: JobId) -> bool: """ Job done Parameters: ----------- job_id: str Job id Returns: -------- A bool which is True if and only if the job is done """ status: osparc.JobStatus = self._solvers_api.inspect_job( self._solver.id, self._solver.version, job_id ) return not (status.stopped_at is None)
[docs] def get_results(self, job_id: JobId) -> dict[str, Any]: """ Get the results from a job Parameters: ----------- job_id: str The job id Returns: -------- A dictionary containing the results. """ if not self.job_done(job_id): raise RuntimeError(f"The job with job_id={job_id} is not done yet.") outputs: osparc.JobOutputs = self._solvers_api.get_job_outputs( self._solver.id, self._solver.version, job_id ) results: dict[str, Any] = {} for key in outputs.results: r = outputs.results[key] if isinstance(r, osparc.File): download_path: str = self._files_api.download_file(file_id=r.id) results[key] = Path(download_path) else: results[key] = r return results
[docs] def get_job_log(self, job_id: JobId) -> TemporaryDirectory: """ Get the logs from a job Parameters: ----------- job_id: str The job id Returns: -------- A tempfile.TemporaryDirectory holding the log files """ logfile_path: str = self._solvers_api.get_job_output_logfile( self._solver.id, self._solver.version, job_id ) if not (Path(logfile_path).is_file() and is_zipfile(logfile_path)): raise RuntimeError("Could not download logfiles") tmp_dir = TemporaryDirectory() with ZipFile(logfile_path) as zf: zf.extractall(tmp_dir.name) os.remove(logfile_path) return tmp_dir
[docs] class O2SparcService(ServiceBase): """Wraps osparc python client library and fulfills ServiceBase interface""" def __init__(self, config: ConfigDict | None = None, connect: bool = True) -> None: config = config or {} logging.info("Initializing o2sparc...") logging.debug("%s", f"{config=}") kwargs = {} for name in ("host", "username", "password"): env_name = f"O2SPARC_{name.upper()}" config_name = env_name.lower() value = os.environ.get(env_name) or config.get(config_name) if value is not None: kwargs[name] = value logging.debug(f"Config arguments:{kwargs}") configuration = osparc.Configuration(**kwargs) # reuses profile-name from penssieve to set debug mode profile_name = config.get("pennsieve_profile_name", "prod") configuration.debug = profile_name == "test" self._client = osparc.ApiClient(configuration=configuration) if connect: self.connect()
[docs] def connect(self) -> osparc.ApiClient: """Explicitily initializes client pool (not required)""" p = self._client.pool logging.debug("%s was initialized", p) return self._client
[docs] def info(self) -> str: """Returns the version of osparc client.""" return self._client.user_agent.split("/")[1]
[docs] def get_profile(self) -> UserNameStr: """Returns currently user profile. Returns: -------- A string with username. """ users_api = osparc.UsersApi(self._client) profile: Profile = users_api.get_my_profile() return profile.login
[docs] def set_profile(self, username: str, password: str) -> UserNameStr: """Changes to a different user profile Parameters: ----------- username :str API user key password :str API user secret Returns: -------- A string with username. """ cfg = self._client.configuration cfg.username = username cfg.password = password return self.get_profile()
[docs] def close(self) -> None: """Closes the osparc client.""" self._client.close()
[docs] def get_solver(self, solver_key: str, solver_version: str) -> O2SparcSolver: """Get a computational service (solver) to which jobs can be submitted. Parameters: ----------- solver_key :str Solver key solver_version :str Solver version Returns: -------- A O2SparcSolver object, to which jobs can be submitted """ return O2SparcSolver(self._client, solver_key, solver_version)