Module pachyderm_sdk.config

Functionality for parsing Pachyderm config files.

Expand source code
"""Functionality for parsing Pachyderm config files."""

import json
import os
import uuid
from base64 import b64decode
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, Optional, Union

from .errors import ConfigError


class ConfigFile:
    """A parsed Pachyderm config file."""

    def __init__(self, data: dict):
        """
        Parameters
        ----------
        data : dict (JSON)
            The config file data. The only required fields are:
            {
              "v2": {
                "active_context": ...,
                "contexts": {...}
              }
            }
        """
        if (v2 := data.get("v2")) is None or not (
            "active_context" in v2 and "contexts" in v2
        ):
            raise ValueError(
                "Missing required fields v2.active_context and/or v2.contexts."
            )
        self._config_file_data = data

    @classmethod
    def from_path(cls, config_file: Union[Path, str]) -> "ConfigFile":
        """Parse a Pachyderm config file from a local file.

        Parameters
        ----------
        config_file : str
            The path to the config file.
        """
        config_file = Path(os.path.expanduser(config_file)).resolve()
        return cls(data=json.loads(config_file.read_bytes()))

    @classmethod
    def new_with_context(cls, name: str, context: "Context") -> "ConfigFile":
        """Create a new ConfigFile object from a single context.
        This will also set this specified context as the active context.

        Parameters
        ----------
        name : str
            The name of the context.
        context : Context
            The context object.
        """
        data = dict(
            user_id=str(uuid.uuid4()).replace("-", ""),
            v2=dict(
                active_context=name,
                contexts={name: asdict(context)},
                metrics=True,
            ),
        )
        return cls(data)

    @property
    def user_id(self) -> Optional[str]:
        """The user ID of the config file."""
        return self._config_file_data.get("user_id")

    @property
    def active_context(self) -> "Context":
        """The currently-active context."""
        active_context_name = self._config_file_data["v2"]["active_context"]
        contexts = self._config_file_data["v2"]["contexts"]
        if active_context_name not in contexts:
            raise ConfigError(f"active context not found: {active_context_name}")
        return Context(**contexts[active_context_name])

    @property
    def active_enterprise_context(self) -> "Context":
        """The currently-active context that is enterprise-enabled."""
        context_name = self._config_file_data["v2"].get("active_enterprise_context")
        if context_name is None:
            raise ConfigError("active enterprise context is not specified")
        contexts = self._config_file_data["v2"]["contexts"]
        if context_name not in contexts:
            raise ConfigError(f"active enterprise context not found: {context_name}")
        return Context(**contexts[context_name])

    def add_context(
        self, name: str, context: "Context", *, overwrite: bool = False
    ) -> None:
        """Add a context to the parsed config.
        The context name must be unique, unless overwrite is True.

        Parameters
        ----------
        name : str
            The name of the context.
        context : Context
            The context object.
        overwrite : bool (kwarg)
            Whether to overwrite an existing context.
        """
        contexts = self._config_file_data["v2"]["contexts"]
        if (name in contexts) and not overwrite:
            raise ValueError(f'context "{name}" already exists')
        contexts[name] = asdict(context)

    def set_active_context(self, name: str) -> None:
        """Set the active context to the specified name.
        The context must exist, else a ValueError is raise.

        Parameters
        ----------
        name : str
            The name of the context.
        """
        contexts = self._config_file_data["v2"]["contexts"]
        if name not in contexts:
            raise ValueError(f'context "{name}" does not exist')
        self._config_file_data["v2"]["active_context"] = name

    def write(self, config_file: Union[Path, str]) -> None:
        """Write the current config data to the specified file.

        Parameters
        ----------
        config_file : str
            The path to the config file.
        """
        config_file = Path(os.path.expanduser(config_file)).resolve()
        with config_file.open("w") as file_out:
            file_out.write(json.dumps(self._config_file_data, indent=2))
            file_out.write("\n")


@dataclass
class Context:
    """A context contains all the information needed to connect to a pachyderm
    instance. Not all fields need to be present for the context to be valid."""

    source: Optional[int] = None
    """An integer that specifies where the config came from. 
    This parameter is for internal use only and should not be modified."""

    pachd_address: Optional[str] = None
    """A host:port specification for connecting to pachd."""

    server_cas: Optional[str] = None
    """Trusted root certificates for the cluster, formatted as a 
    base64-encoded PEM. This is only set when TLS is enabled."""

    session_token: Optional[str] = None
    """A secret token identifying the current user within their pachyderm
    cluster. This is included in all RPCs and used to determine if a user's
    actions are authorized. This is only set when auth is enabled."""

    active_transaction: Optional[str] = None
    """The currently active transaction for batching together commands."""

    cluster_name: Optional[str] = None
    """The name of the underlying Kubernetes cluster."""

    auth_info: Optional[str] = None
    """The name of the underlying Kubernetes cluster’s auth credentials"""

    namespace: Optional[str] = None
    """The underlying Kubernetes cluster’s namespace"""

    cluster_deployment_id: Optional[str] = None
    """The pachyderm cluster deployment ID that is used to ensure the
    operations run on the expected cluster."""

    project: Optional[str] = None

    enterprise_server: bool = False
    """Whether the context represents an enterprise server."""

    port_forwarders: Dict[str, int] = None
    """A mapping of service name -> local port."""

    @property
    def active_pachd_address(self) -> str:
        """This pachd address factors in port-forwarding."""
        if self.pachd_address is None:
            port = 30650
            if self.port_forwarders:
                port = self.port_forwarders.get("pachd", 30650)
            return f"grpc://localhost:{port}"
        return self.pachd_address

    @property
    def server_cas_decoded(self) -> Optional[bytes]:
        """The base64 decoded root certificates in PEM format, if they exist."""
        if self.server_cas:
            return b64decode(bytes(self.server_cas, "utf-8"))

Classes

class ConfigFile (data: dict)

A parsed Pachyderm config file.

Parameters

data : dict (JSON)
The config file data. The only required fields are: { "v2": { "active_context": …, "contexts": {…} } }
Expand source code
class ConfigFile:
    """A parsed Pachyderm config file."""

    def __init__(self, data: dict):
        """
        Parameters
        ----------
        data : dict (JSON)
            The config file data. The only required fields are:
            {
              "v2": {
                "active_context": ...,
                "contexts": {...}
              }
            }
        """
        if (v2 := data.get("v2")) is None or not (
            "active_context" in v2 and "contexts" in v2
        ):
            raise ValueError(
                "Missing required fields v2.active_context and/or v2.contexts."
            )
        self._config_file_data = data

    @classmethod
    def from_path(cls, config_file: Union[Path, str]) -> "ConfigFile":
        """Parse a Pachyderm config file from a local file.

        Parameters
        ----------
        config_file : str
            The path to the config file.
        """
        config_file = Path(os.path.expanduser(config_file)).resolve()
        return cls(data=json.loads(config_file.read_bytes()))

    @classmethod
    def new_with_context(cls, name: str, context: "Context") -> "ConfigFile":
        """Create a new ConfigFile object from a single context.
        This will also set this specified context as the active context.

        Parameters
        ----------
        name : str
            The name of the context.
        context : Context
            The context object.
        """
        data = dict(
            user_id=str(uuid.uuid4()).replace("-", ""),
            v2=dict(
                active_context=name,
                contexts={name: asdict(context)},
                metrics=True,
            ),
        )
        return cls(data)

    @property
    def user_id(self) -> Optional[str]:
        """The user ID of the config file."""
        return self._config_file_data.get("user_id")

    @property
    def active_context(self) -> "Context":
        """The currently-active context."""
        active_context_name = self._config_file_data["v2"]["active_context"]
        contexts = self._config_file_data["v2"]["contexts"]
        if active_context_name not in contexts:
            raise ConfigError(f"active context not found: {active_context_name}")
        return Context(**contexts[active_context_name])

    @property
    def active_enterprise_context(self) -> "Context":
        """The currently-active context that is enterprise-enabled."""
        context_name = self._config_file_data["v2"].get("active_enterprise_context")
        if context_name is None:
            raise ConfigError("active enterprise context is not specified")
        contexts = self._config_file_data["v2"]["contexts"]
        if context_name not in contexts:
            raise ConfigError(f"active enterprise context not found: {context_name}")
        return Context(**contexts[context_name])

    def add_context(
        self, name: str, context: "Context", *, overwrite: bool = False
    ) -> None:
        """Add a context to the parsed config.
        The context name must be unique, unless overwrite is True.

        Parameters
        ----------
        name : str
            The name of the context.
        context : Context
            The context object.
        overwrite : bool (kwarg)
            Whether to overwrite an existing context.
        """
        contexts = self._config_file_data["v2"]["contexts"]
        if (name in contexts) and not overwrite:
            raise ValueError(f'context "{name}" already exists')
        contexts[name] = asdict(context)

    def set_active_context(self, name: str) -> None:
        """Set the active context to the specified name.
        The context must exist, else a ValueError is raise.

        Parameters
        ----------
        name : str
            The name of the context.
        """
        contexts = self._config_file_data["v2"]["contexts"]
        if name not in contexts:
            raise ValueError(f'context "{name}" does not exist')
        self._config_file_data["v2"]["active_context"] = name

    def write(self, config_file: Union[Path, str]) -> None:
        """Write the current config data to the specified file.

        Parameters
        ----------
        config_file : str
            The path to the config file.
        """
        config_file = Path(os.path.expanduser(config_file)).resolve()
        with config_file.open("w") as file_out:
            file_out.write(json.dumps(self._config_file_data, indent=2))
            file_out.write("\n")

Static methods

def from_path(config_file: Union[pathlib.Path, str]) ‑> ConfigFile

Parse a Pachyderm config file from a local file.

Parameters

config_file : str
The path to the config file.
Expand source code
@classmethod
def from_path(cls, config_file: Union[Path, str]) -> "ConfigFile":
    """Parse a Pachyderm config file from a local file.

    Parameters
    ----------
    config_file : str
        The path to the config file.
    """
    config_file = Path(os.path.expanduser(config_file)).resolve()
    return cls(data=json.loads(config_file.read_bytes()))
def new_with_context(name: str, context: Context) ‑> ConfigFile

Create a new ConfigFile object from a single context. This will also set this specified context as the active context.

Parameters

name : str
The name of the context.
context : Context
The context object.
Expand source code
@classmethod
def new_with_context(cls, name: str, context: "Context") -> "ConfigFile":
    """Create a new ConfigFile object from a single context.
    This will also set this specified context as the active context.

    Parameters
    ----------
    name : str
        The name of the context.
    context : Context
        The context object.
    """
    data = dict(
        user_id=str(uuid.uuid4()).replace("-", ""),
        v2=dict(
            active_context=name,
            contexts={name: asdict(context)},
            metrics=True,
        ),
    )
    return cls(data)

Instance variables

var user_id : Optional[str]

The user ID of the config file.

Expand source code
@property
def user_id(self) -> Optional[str]:
    """The user ID of the config file."""
    return self._config_file_data.get("user_id")
var active_contextContext

The currently-active context.

Expand source code
@property
def active_context(self) -> "Context":
    """The currently-active context."""
    active_context_name = self._config_file_data["v2"]["active_context"]
    contexts = self._config_file_data["v2"]["contexts"]
    if active_context_name not in contexts:
        raise ConfigError(f"active context not found: {active_context_name}")
    return Context(**contexts[active_context_name])
var active_enterprise_contextContext

The currently-active context that is enterprise-enabled.

Expand source code
@property
def active_enterprise_context(self) -> "Context":
    """The currently-active context that is enterprise-enabled."""
    context_name = self._config_file_data["v2"].get("active_enterprise_context")
    if context_name is None:
        raise ConfigError("active enterprise context is not specified")
    contexts = self._config_file_data["v2"]["contexts"]
    if context_name not in contexts:
        raise ConfigError(f"active enterprise context not found: {context_name}")
    return Context(**contexts[context_name])

Methods

def add_context(self, name: str, context: Context, *, overwrite: bool = False) ‑> None

Add a context to the parsed config. The context name must be unique, unless overwrite is True.

Parameters

name : str
The name of the context.
context : Context
The context object.
overwrite : bool (kwarg)
Whether to overwrite an existing context.
Expand source code
def add_context(
    self, name: str, context: "Context", *, overwrite: bool = False
) -> None:
    """Add a context to the parsed config.
    The context name must be unique, unless overwrite is True.

    Parameters
    ----------
    name : str
        The name of the context.
    context : Context
        The context object.
    overwrite : bool (kwarg)
        Whether to overwrite an existing context.
    """
    contexts = self._config_file_data["v2"]["contexts"]
    if (name in contexts) and not overwrite:
        raise ValueError(f'context "{name}" already exists')
    contexts[name] = asdict(context)
def set_active_context(self, name: str) ‑> None

Set the active context to the specified name. The context must exist, else a ValueError is raise.

Parameters

name : str
The name of the context.
Expand source code
def set_active_context(self, name: str) -> None:
    """Set the active context to the specified name.
    The context must exist, else a ValueError is raise.

    Parameters
    ----------
    name : str
        The name of the context.
    """
    contexts = self._config_file_data["v2"]["contexts"]
    if name not in contexts:
        raise ValueError(f'context "{name}" does not exist')
    self._config_file_data["v2"]["active_context"] = name
def write(self, config_file: Union[pathlib.Path, str]) ‑> None

Write the current config data to the specified file.

Parameters

config_file : str
The path to the config file.
Expand source code
def write(self, config_file: Union[Path, str]) -> None:
    """Write the current config data to the specified file.

    Parameters
    ----------
    config_file : str
        The path to the config file.
    """
    config_file = Path(os.path.expanduser(config_file)).resolve()
    with config_file.open("w") as file_out:
        file_out.write(json.dumps(self._config_file_data, indent=2))
        file_out.write("\n")
class Context (source: Optional[int] = None, pachd_address: Optional[str] = None, server_cas: Optional[str] = None, session_token: Optional[str] = None, active_transaction: Optional[str] = None, cluster_name: Optional[str] = None, auth_info: Optional[str] = None, namespace: Optional[str] = None, cluster_deployment_id: Optional[str] = None, project: Optional[str] = None, enterprise_server: bool = False, port_forwarders: Dict[str, int] = None)

A context contains all the information needed to connect to a pachyderm instance. Not all fields need to be present for the context to be valid.

Expand source code
@dataclass
class Context:
    """A context contains all the information needed to connect to a pachyderm
    instance. Not all fields need to be present for the context to be valid."""

    source: Optional[int] = None
    """An integer that specifies where the config came from. 
    This parameter is for internal use only and should not be modified."""

    pachd_address: Optional[str] = None
    """A host:port specification for connecting to pachd."""

    server_cas: Optional[str] = None
    """Trusted root certificates for the cluster, formatted as a 
    base64-encoded PEM. This is only set when TLS is enabled."""

    session_token: Optional[str] = None
    """A secret token identifying the current user within their pachyderm
    cluster. This is included in all RPCs and used to determine if a user's
    actions are authorized. This is only set when auth is enabled."""

    active_transaction: Optional[str] = None
    """The currently active transaction for batching together commands."""

    cluster_name: Optional[str] = None
    """The name of the underlying Kubernetes cluster."""

    auth_info: Optional[str] = None
    """The name of the underlying Kubernetes cluster’s auth credentials"""

    namespace: Optional[str] = None
    """The underlying Kubernetes cluster’s namespace"""

    cluster_deployment_id: Optional[str] = None
    """The pachyderm cluster deployment ID that is used to ensure the
    operations run on the expected cluster."""

    project: Optional[str] = None

    enterprise_server: bool = False
    """Whether the context represents an enterprise server."""

    port_forwarders: Dict[str, int] = None
    """A mapping of service name -> local port."""

    @property
    def active_pachd_address(self) -> str:
        """This pachd address factors in port-forwarding."""
        if self.pachd_address is None:
            port = 30650
            if self.port_forwarders:
                port = self.port_forwarders.get("pachd", 30650)
            return f"grpc://localhost:{port}"
        return self.pachd_address

    @property
    def server_cas_decoded(self) -> Optional[bytes]:
        """The base64 decoded root certificates in PEM format, if they exist."""
        if self.server_cas:
            return b64decode(bytes(self.server_cas, "utf-8"))

Class variables

var source : Optional[int]

An integer that specifies where the config came from. This parameter is for internal use only and should not be modified.

var pachd_address : Optional[str]

A host:port specification for connecting to pachd.

var server_cas : Optional[str]

Trusted root certificates for the cluster, formatted as a base64-encoded PEM. This is only set when TLS is enabled.

var session_token : Optional[str]

A secret token identifying the current user within their pachyderm cluster. This is included in all RPCs and used to determine if a user's actions are authorized. This is only set when auth is enabled.

var active_transaction : Optional[str]

The currently active transaction for batching together commands.

var cluster_name : Optional[str]

The name of the underlying Kubernetes cluster.

var auth_info : Optional[str]

The name of the underlying Kubernetes cluster’s auth credentials

var namespace : Optional[str]

The underlying Kubernetes cluster’s namespace

var cluster_deployment_id : Optional[str]

The pachyderm cluster deployment ID that is used to ensure the operations run on the expected cluster.

var project : Optional[str]
var enterprise_server : bool

Whether the context represents an enterprise server.

var port_forwarders : Dict[str, int]

A mapping of service name -> local port.

Instance variables

var active_pachd_address : str

This pachd address factors in port-forwarding.

Expand source code
@property
def active_pachd_address(self) -> str:
    """This pachd address factors in port-forwarding."""
    if self.pachd_address is None:
        port = 30650
        if self.port_forwarders:
            port = self.port_forwarders.get("pachd", 30650)
        return f"grpc://localhost:{port}"
    return self.pachd_address
var server_cas_decoded : Optional[bytes]

The base64 decoded root certificates in PEM format, if they exist.

Expand source code
@property
def server_cas_decoded(self) -> Optional[bytes]:
    """The base64 decoded root certificates in PEM format, if they exist."""
    if self.server_cas:
        return b64decode(bytes(self.server_cas, "utf-8"))