Module lore.types.conversion

Expand source code
import json
import time
from pathlib import Path
from typing import Any, Union

import datasets as hf
from fastapi.encoders import jsonable_encoder

import lore.orm as orm
import lore.types.base_types as bt
import lore.types.enums as bte
from lore.utils.dataset import get_dataset_size, update_dataset_info


def to_project(p: dict) -> bt.Project:
    assert "workspaceId" in p, str(p)
    return bt.Project(id=p["id"], name=p["name"], workspace_id=p["workspaceId"])


def to_hf_dataset(dataset: bt.Dataset) -> hf.DatasetDict:
    return hf.load_from_disk(dataset.storage_path)


def to_replaced_dataset(
    dataset: bt.Dataset, hf_dataset: Union[hf.DatasetDict, hf.Dataset], data_dir: Path
) -> bt.Dataset:
    if isinstance(hf_dataset, hf.DatasetDict):
        for split in hf_dataset:
            hf_dataset[split] = update_dataset_info(hf_dataset[split])
        features: Union[dict[Any, str], str] = {
            split: str(hf_dataset[split].features) for split in hf_dataset
        }
        num_rows = sum(hf_dataset.num_rows.values())
    else:
        hf_dataset = update_dataset_info(hf_dataset)
        features = str(hf_dataset.features)
        num_rows = hf_dataset.num_rows

    hf_dataset.save_to_disk(str(data_dir))

    kwargs = {
        "name": dataset.name,
        "workspace_id": dataset.workspace_id,
        "creation_time": time.time(),
        "task_type": dataset.task_type if hasattr(dataset, "task_type") else "",
        "data_type": dataset.data_type if hasattr(dataset, "data_type") else [],
        "storage_path": str(data_dir),
        "features": json.dumps(features),
        "source": dataset.source if hasattr(dataset, "source") else "",
        "row_count": num_rows,
        "size": get_dataset_size(hf_dataset, str(data_dir)),
        "registered_in_genai": False,
        "description": dataset.description,
    }

    return bt.Dataset(**kwargs)


def serialize_result(data):
    return jsonable_encoder(data)


def create_from_celery_result(r) -> bt.AsyncResponse:
    task_result = None
    task_error = None
    task_id = r.task_id
    task_ready = r.ready()
    task_successful = r.successful()
    task_status = r.state
    if task_ready:
        task_status = r.state  # Another get to ensure we get ready state.
        if task_successful:
            task_result = r.get()
        else:
            task_error = r.get()
    return bt.AsyncResponse(
        task_id=task_id,
        status=task_status,
        ready=task_ready,
        successful=task_successful,
        result=serialize_result(task_result),
        error=serialize_result(task_error),
    )


def to_resource_pool_metadata(w: dict) -> bt.ResourcePoolMetadata:
    assert "gpu_type" in w, "gpu_type missing from ResourcePoolMetadata"
    assert "max_agents" in w, "max_agents missing from ResourcePoolMetadata"
    gpu_type = bte.GPUType.from_string(w["gpu_type"])
    assert (
        gpu_type is not None
    ), f"Could not find GPUType for '{w['gpu_type']}'. Choices: {[t.value for t in bte.GPUType]}"
    return bt.ResourcePoolMetadata(gpu_type=gpu_type, max_agents=w["max_agents"])

Functions

def create_from_celery_result(r) ‑> AsyncResponse
Expand source code
def create_from_celery_result(r) -> bt.AsyncResponse:
    task_result = None
    task_error = None
    task_id = r.task_id
    task_ready = r.ready()
    task_successful = r.successful()
    task_status = r.state
    if task_ready:
        task_status = r.state  # Another get to ensure we get ready state.
        if task_successful:
            task_result = r.get()
        else:
            task_error = r.get()
    return bt.AsyncResponse(
        task_id=task_id,
        status=task_status,
        ready=task_ready,
        successful=task_successful,
        result=serialize_result(task_result),
        error=serialize_result(task_error),
    )
def serialize_result(data)
Expand source code
def serialize_result(data):
    return jsonable_encoder(data)
def to_hf_dataset(dataset: Dataset) ‑> datasets.dataset_dict.DatasetDict
Expand source code
def to_hf_dataset(dataset: bt.Dataset) -> hf.DatasetDict:
    return hf.load_from_disk(dataset.storage_path)
def to_project(p: dict) ‑> Project
Expand source code
def to_project(p: dict) -> bt.Project:
    assert "workspaceId" in p, str(p)
    return bt.Project(id=p["id"], name=p["name"], workspace_id=p["workspaceId"])
def to_replaced_dataset(dataset: Dataset, hf_dataset: Union[datasets.arrow_dataset.Dataset, datasets.dataset_dict.DatasetDict], data_dir: pathlib.Path) ‑> Dataset
Expand source code
def to_replaced_dataset(
    dataset: bt.Dataset, hf_dataset: Union[hf.DatasetDict, hf.Dataset], data_dir: Path
) -> bt.Dataset:
    if isinstance(hf_dataset, hf.DatasetDict):
        for split in hf_dataset:
            hf_dataset[split] = update_dataset_info(hf_dataset[split])
        features: Union[dict[Any, str], str] = {
            split: str(hf_dataset[split].features) for split in hf_dataset
        }
        num_rows = sum(hf_dataset.num_rows.values())
    else:
        hf_dataset = update_dataset_info(hf_dataset)
        features = str(hf_dataset.features)
        num_rows = hf_dataset.num_rows

    hf_dataset.save_to_disk(str(data_dir))

    kwargs = {
        "name": dataset.name,
        "workspace_id": dataset.workspace_id,
        "creation_time": time.time(),
        "task_type": dataset.task_type if hasattr(dataset, "task_type") else "",
        "data_type": dataset.data_type if hasattr(dataset, "data_type") else [],
        "storage_path": str(data_dir),
        "features": json.dumps(features),
        "source": dataset.source if hasattr(dataset, "source") else "",
        "row_count": num_rows,
        "size": get_dataset_size(hf_dataset, str(data_dir)),
        "registered_in_genai": False,
        "description": dataset.description,
    }

    return bt.Dataset(**kwargs)
def to_resource_pool_metadata(w: dict) ‑> ResourcePoolMetadata
Expand source code
def to_resource_pool_metadata(w: dict) -> bt.ResourcePoolMetadata:
    assert "gpu_type" in w, "gpu_type missing from ResourcePoolMetadata"
    assert "max_agents" in w, "max_agents missing from ResourcePoolMetadata"
    gpu_type = bte.GPUType.from_string(w["gpu_type"])
    assert (
        gpu_type is not None
    ), f"Could not find GPUType for '{w['gpu_type']}'. Choices: {[t.value for t in bte.GPUType]}"
    return bt.ResourcePoolMetadata(gpu_type=gpu_type, max_agents=w["max_agents"])