Source code for openprotein.jobs.schemas
import logging
from datetime import datetime
from enum import Enum
from typing import Union
from pydantic import BaseModel, ConfigDict, TypeAdapter
from requests import Response
from typing_extensions import Self
logger = logging.getLogger(__name__)
class JobType(str, Enum):
"""
Type of job.
Describes the types of jobs that can be done.
"""
stub = "stub"
workflow_preprocess = "/workflow/preprocess"
workflow_train = "/workflow/train"
workflow_embed_umap = "/workflow/embed/umap"
workflow_predict = "/workflow/predict"
workflow_predict_single_site = "/workflow/predict/single_site"
workflow_crossvalidate = "/workflow/crossvalidate"
workflow_evaluate = "/workflow/evaluate"
workflow_design = "/workflow/design"
align_align = "/align/align"
align_prompt = "/align/prompt"
clustalo = "/align/clustalo"
mafft = "/align/mafft"
abnumber = "/align/abnumber"
poet = "/poet"
poet_score = "/poet/score"
poet_single_site = "/poet/single_site"
poet_generate = "/poet/generate"
poet_score_indel = "/poet/score/indel"
embeddings_embed = "/embeddings/embed"
embeddings_svd = "/embeddings/svd"
embeddings_attn = "/embeddings/attn"
embeddings_logits = "/embeddings/logits"
embeddings_embed_reduced = "/embeddings/embed_reduced"
svd_fit = "/svd/fit"
svd_embed = "/svd/embed"
umap_fit = "/umap/fit"
umap_embed = "/umap/embed"
embeddings_fold = "/embeddings/fold"
# predictor jobs
predictor_train = "/predictor/train"
predictor_predict = "/predictor/predict"
predictor_crossvalidate = "/predictor/crossvalidate"
predictor_predict_single_site = "/predictor/predict_single_site"
predictor_predict_multi = "/predictor/predict_multi"
predictor_predict_multi_single_site = "/predictor/predict_multi_single_site"
# designer
designer = "/design"
class JobStatus(str, Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
RETRYING = "RETRYING"
CANCELED = "CANCELED"
def done(self):
return (
(self is self.SUCCESS) or (self is self.FAILURE) or (self is self.CANCELED)
) # noqa: E501
def cancelled(self):
return self is self.CANCELED
[docs]
class Job(BaseModel):
job_id: str
# new emb service get doesnt have job_type
job_type: str
status: JobStatus
created_date: datetime
start_date: datetime | None = None
end_date: datetime | None = None
prerequisite_job_id: str | None = None
progress_message: str | None = None
progress_counter: int | None = None
sequence_length: int | None = None
@classmethod
def create(cls, obj: "Job | Response | dict", **kwargs) -> Self:
# parse specific child Job from base Job or Response
try:
# try to parse as subclass job
# get dict form
d = (
obj.json()
if isinstance(obj, Response)
else obj.model_dump() if isinstance(obj, Job) else obj
)
job_classes = Job.__subclasses__()
job = TypeAdapter(Union[tuple(job_classes)]).validate_python(d | kwargs) # type: ignore
except Exception as e:
job = Job.model_validate(d | kwargs)
return job # type: ignore - static checker cannot know runtime type
# hide extra allowed fields
def __repr_args__(self):
for k, v in self.__dict__.items():
field = self.model_fields.get(k)
if field and field.repr:
yield k, v
yield from (
(k, getattr(self, k))
for k, v in self.model_computed_fields.items()
if v.repr
)
# allows to carry over subclassed job fields when factory creating
model_config = ConfigDict(extra="allow")
class BatchJob(BaseModel):
num_records: int | None = None