Module lore.types.conversion
Expand source code
import json
import time
from pathlib import Path
from typing import Any, Union
import datasets as hf
from fastapi.encoders import jsonable_encoder
import lore.orm as orm
import lore.types.base_types as bt
import lore.types.enums as bte
from lore.utils.dataset import get_dataset_size, update_dataset_info
def to_project(p: dict) -> bt.Project:
assert "workspaceId" in p, str(p)
return bt.Project(id=p["id"], name=p["name"], workspace_id=p["workspaceId"])
def to_hf_dataset(dataset: bt.Dataset) -> hf.DatasetDict:
return hf.load_from_disk(dataset.storage_path)
def to_replaced_dataset(
dataset: bt.Dataset, hf_dataset: Union[hf.DatasetDict, hf.Dataset], data_dir: Path
) -> bt.Dataset:
if isinstance(hf_dataset, hf.DatasetDict):
for split in hf_dataset:
hf_dataset[split] = update_dataset_info(hf_dataset[split])
features: Union[dict[Any, str], str] = {
split: str(hf_dataset[split].features) for split in hf_dataset
}
num_rows = sum(hf_dataset.num_rows.values())
else:
hf_dataset = update_dataset_info(hf_dataset)
features = str(hf_dataset.features)
num_rows = hf_dataset.num_rows
hf_dataset.save_to_disk(str(data_dir))
kwargs = {
"name": dataset.name,
"workspace_id": dataset.workspace_id,
"creation_time": time.time(),
"task_type": dataset.task_type if hasattr(dataset, "task_type") else "",
"data_type": dataset.data_type if hasattr(dataset, "data_type") else [],
"storage_path": str(data_dir),
"features": json.dumps(features),
"source": dataset.source if hasattr(dataset, "source") else "",
"row_count": num_rows,
"size": get_dataset_size(hf_dataset, str(data_dir)),
"registered_in_genai": False,
"description": dataset.description,
}
return bt.Dataset(**kwargs)
def serialize_result(data):
return jsonable_encoder(data)
def create_from_celery_result(r) -> bt.AsyncResponse:
task_result = None
task_error = None
task_id = r.task_id
task_ready = r.ready()
task_successful = r.successful()
task_status = r.state
if task_ready:
task_status = r.state # Another get to ensure we get ready state.
if task_successful:
task_result = r.get()
else:
task_error = r.get()
return bt.AsyncResponse(
task_id=task_id,
status=task_status,
ready=task_ready,
successful=task_successful,
result=serialize_result(task_result),
error=serialize_result(task_error),
)
def to_resource_pool_metadata(w: dict) -> bt.ResourcePoolMetadata:
assert "gpu_type" in w, "gpu_type missing from ResourcePoolMetadata"
assert "max_agents" in w, "max_agents missing from ResourcePoolMetadata"
gpu_type = bte.GPUType.from_string(w["gpu_type"])
assert (
gpu_type is not None
), f"Could not find GPUType for '{w['gpu_type']}'. Choices: {[t.value for t in bte.GPUType]}"
return bt.ResourcePoolMetadata(gpu_type=gpu_type, max_agents=w["max_agents"])
Functions
def create_from_celery_result(r) ‑> AsyncResponse
-
Expand source code
def create_from_celery_result(r) -> bt.AsyncResponse: task_result = None task_error = None task_id = r.task_id task_ready = r.ready() task_successful = r.successful() task_status = r.state if task_ready: task_status = r.state # Another get to ensure we get ready state. if task_successful: task_result = r.get() else: task_error = r.get() return bt.AsyncResponse( task_id=task_id, status=task_status, ready=task_ready, successful=task_successful, result=serialize_result(task_result), error=serialize_result(task_error), )
def serialize_result(data)
-
Expand source code
def serialize_result(data): return jsonable_encoder(data)
def to_hf_dataset(dataset: Dataset) ‑> datasets.dataset_dict.DatasetDict
-
Expand source code
def to_hf_dataset(dataset: bt.Dataset) -> hf.DatasetDict: return hf.load_from_disk(dataset.storage_path)
def to_project(p: dict) ‑> Project
-
Expand source code
def to_project(p: dict) -> bt.Project: assert "workspaceId" in p, str(p) return bt.Project(id=p["id"], name=p["name"], workspace_id=p["workspaceId"])
def to_replaced_dataset(dataset: Dataset, hf_dataset: Union[datasets.arrow_dataset.Dataset, datasets.dataset_dict.DatasetDict], data_dir: pathlib.Path) ‑> Dataset
-
Expand source code
def to_replaced_dataset( dataset: bt.Dataset, hf_dataset: Union[hf.DatasetDict, hf.Dataset], data_dir: Path ) -> bt.Dataset: if isinstance(hf_dataset, hf.DatasetDict): for split in hf_dataset: hf_dataset[split] = update_dataset_info(hf_dataset[split]) features: Union[dict[Any, str], str] = { split: str(hf_dataset[split].features) for split in hf_dataset } num_rows = sum(hf_dataset.num_rows.values()) else: hf_dataset = update_dataset_info(hf_dataset) features = str(hf_dataset.features) num_rows = hf_dataset.num_rows hf_dataset.save_to_disk(str(data_dir)) kwargs = { "name": dataset.name, "workspace_id": dataset.workspace_id, "creation_time": time.time(), "task_type": dataset.task_type if hasattr(dataset, "task_type") else "", "data_type": dataset.data_type if hasattr(dataset, "data_type") else [], "storage_path": str(data_dir), "features": json.dumps(features), "source": dataset.source if hasattr(dataset, "source") else "", "row_count": num_rows, "size": get_dataset_size(hf_dataset, str(data_dir)), "registered_in_genai": False, "description": dataset.description, } return bt.Dataset(**kwargs)
def to_resource_pool_metadata(w: dict) ‑> ResourcePoolMetadata
-
Expand source code
def to_resource_pool_metadata(w: dict) -> bt.ResourcePoolMetadata: assert "gpu_type" in w, "gpu_type missing from ResourcePoolMetadata" assert "max_agents" in w, "max_agents missing from ResourcePoolMetadata" gpu_type = bte.GPUType.from_string(w["gpu_type"]) assert ( gpu_type is not None ), f"Could not find GPUType for '{w['gpu_type']}'. Choices: {[t.value for t in bte.GPUType]}" return bt.ResourcePoolMetadata(gpu_type=gpu_type, max_agents=w["max_agents"])