Source code for openprotein.jobs.schemas

import logging
from datetime import datetime
from enum import Enum
from typing import Union

from pydantic import BaseModel, ConfigDict, TypeAdapter
from requests import Response
from typing_extensions import Self

logger = logging.getLogger(__name__)


class JobType(str, Enum):
    """
    Type of job.

    Describes the types of jobs that can be done.
    """

    stub = "stub"

    workflow_preprocess = "/workflow/preprocess"
    workflow_train = "/workflow/train"
    workflow_embed_umap = "/workflow/embed/umap"
    workflow_predict = "/workflow/predict"
    workflow_predict_single_site = "/workflow/predict/single_site"
    workflow_crossvalidate = "/workflow/crossvalidate"
    workflow_evaluate = "/workflow/evaluate"
    workflow_design = "/workflow/design"

    align_align = "/align/align"
    align_prompt = "/align/prompt"

    clustalo = "/align/clustalo"
    mafft = "/align/mafft"
    abnumber = "/align/abnumber"

    poet = "/poet"
    poet_score = "/poet/score"
    poet_single_site = "/poet/single_site"
    poet_generate = "/poet/generate"
    poet_score_indel = "/poet/score/indel"

    embeddings_embed = "/embeddings/embed"
    embeddings_svd = "/embeddings/svd"
    embeddings_attn = "/embeddings/attn"
    embeddings_logits = "/embeddings/logits"
    embeddings_embed_reduced = "/embeddings/embed_reduced"

    svd_fit = "/svd/fit"
    svd_embed = "/svd/embed"

    umap_fit = "/umap/fit"
    umap_embed = "/umap/embed"

    embeddings_fold = "/embeddings/fold"

    # predictor jobs
    predictor_train = "/predictor/train"
    predictor_predict = "/predictor/predict"
    predictor_crossvalidate = "/predictor/crossvalidate"
    predictor_predict_single_site = "/predictor/predict_single_site"
    predictor_predict_multi = "/predictor/predict_multi"
    predictor_predict_multi_single_site = "/predictor/predict_multi_single_site"

    # designer
    designer = "/design"


class JobStatus(str, Enum):
    PENDING = "PENDING"
    RUNNING = "RUNNING"
    SUCCESS = "SUCCESS"
    FAILURE = "FAILURE"
    RETRYING = "RETRYING"
    CANCELED = "CANCELED"

    def done(self):
        return (
            (self is self.SUCCESS) or (self is self.FAILURE) or (self is self.CANCELED)
        )  # noqa: E501

    def cancelled(self):
        return self is self.CANCELED


[docs] class Job(BaseModel): job_id: str # new emb service get doesnt have job_type job_type: str status: JobStatus created_date: datetime start_date: datetime | None = None end_date: datetime | None = None prerequisite_job_id: str | None = None progress_message: str | None = None progress_counter: int | None = None sequence_length: int | None = None @classmethod def create(cls, obj: "Job | Response | dict", **kwargs) -> Self: # parse specific child Job from base Job or Response try: # try to parse as subclass job # get dict form d = ( obj.json() if isinstance(obj, Response) else obj.model_dump() if isinstance(obj, Job) else obj ) job_classes = Job.__subclasses__() job = TypeAdapter(Union[tuple(job_classes)]).validate_python(d | kwargs) # type: ignore except Exception as e: job = Job.model_validate(d | kwargs) return job # type: ignore - static checker cannot know runtime type # hide extra allowed fields def __repr_args__(self): for k, v in self.__dict__.items(): field = self.model_fields.get(k) if field and field.repr: yield k, v yield from ( (k, getattr(self, k)) for k, v in self.model_computed_fields.items() if v.repr ) # allows to carry over subclassed job fields when factory creating model_config = ConfigDict(extra="allow")
class BatchJob(BaseModel): num_records: int | None = None