Source code for ayon_api.entity_hub

import re
import copy
import collections
import warnings
from abc import ABC, abstractmethod
import typing
from typing import Optional, Iterable, Dict, List, Set, Any

from ._api import get_server_api_connection
from .utils import create_entity_id, convert_entity_id, slugify_string

if typing.TYPE_CHECKING:
    from typing import Literal, Union

    StatusState = Literal["not_started", "in_progress", "done", "blocked"]
    EntityType = Literal["project", "folder", "task", "product", "version"]


class _CustomNone(object):
    def __init__(self, name=None):
        self._name = name or "CustomNone"

    def __repr__(self):
        return "<{}>".format(self._name)

    def __bool__(self):
        return False


UNKNOWN_VALUE = _CustomNone("UNKNOWN_VALUE")
PROJECT_PARENT_ID = _CustomNone("PROJECT_PARENT_ID")
_NOT_SET = _CustomNone("_NOT_SET")


[docs]class EntityHub(object): """Helper to create, update or remove entities in project. The hub is a guide to operation with folder entities and update of project. Project entity must already exist on server (can be only updated). Object is caching entities queried from server. They won't be required once they were queried, so it is recommended to create new hub or clear cache frequently. Todos: Listen to server events about entity changes to be able to update already queried entities. Args: project_name (str): Name of project where changes will happen. connection (ServerAPI): Connection to server with logged user. allow_data_changes (bool): This option gives ability to change 'data' key on entities. This is not recommended as 'data' may be used for secure information and would also slow down server queries. Content of 'data' key can't be received only GraphQl. """ def __init__( self, project_name, connection=None, allow_data_changes=None ): if not connection: connection = get_server_api_connection() major, minor, _, _, _ = connection.server_version_tuple path_start_with_slash = True if (major, minor) < (0, 6): path_start_with_slash = False if allow_data_changes is None: allow_data_changes = connection.graphql_allows_data_in_query self._connection = connection self._path_start_with_slash = path_start_with_slash self._project_name = project_name self._entities_by_id = {} self._entities_by_parent_id = collections.defaultdict(list) self._project_entity = UNKNOWN_VALUE self._allow_data_changes = allow_data_changes self._path_reset_queue = None @property def allow_data_changes(self): """Entity hub allows changes of 'data' key on entities. Data are private and not all users may have access to them. Older version of AYON server allowed to get 'data' for entity only using REST api calls, which means to query each entity on-by-one from server. Returns: bool: Data changes are allowed. """ return self._allow_data_changes @property def path_start_with_slash(self): """Folder path should start with slash. This changed in 0.6.x server version. Returns: bool: Path starts with slash. """ return self._path_start_with_slash @property def project_name(self): """Project name which is maintained by hub. Returns: str: Name of project. """ return self._project_name @property def project_entity(self): """Project entity. Returns: ProjectEntity: Project entity. """ if self._project_entity is UNKNOWN_VALUE: self.fill_project_from_server() return self._project_entity
[docs] def get_attributes_for_type(self, entity_type: "EntityType"): """Get attributes available for a type. Attributes are based on entity types. Todos: Use attribute schema to validate values on entities. Args: entity_type (EntityType): Entity type for which should be attributes received. Returns: Dict[str, Dict[str, Any]]: Attribute schemas that are available for entered entity type. """ return self._connection.get_attributes_for_type(entity_type)
[docs] def get_entity_by_id(self, entity_id: str) -> Optional["BaseEntity"]: """Receive entity by its id without entity type. The entity must be already existing in cached objects. Args: entity_id (str): Id of entity. Returns: Optional[BaseEntity]: Entity object or None. """ return self._entities_by_id.get(entity_id)
[docs] def get_folder_by_id( self, entity_id: str, allow_fetch: Optional[bool] = True, ) -> Optional["FolderEntity"]: """Get folder entity by id. Args: entity_id (str): Folder entity id. allow_fetch (bool): Try to fetch entity from server if is not available in cache. Returns: Optional[FolderEntity]: Folder entity object. """ if allow_fetch: return self.get_or_fetch_entity_by_id(entity_id, ["folder"]) return self._entities_by_id.get(entity_id)
[docs] def get_task_by_id( self, entity_id: str, allow_fetch: Optional[bool] = True, ) -> Optional["TaskEntity"]: """Get task entity by id. Args: entity_id (str): Id of task entity. allow_fetch (bool): Try to fetch entity from server if is not available in cache. Returns: Optional[TaskEntity]: Task entity object or None. """ if allow_fetch: return self.get_or_fetch_entity_by_id(entity_id, ["task"]) return self._entities_by_id.get(entity_id)
[docs] def get_product_by_id( self, entity_id: str, allow_fetch: Optional[bool] = True, ) -> Optional["ProductEntity"]: """Get product entity by id. Args: entity_id (str): Product id. allow_fetch (bool): Try to fetch entity from server if is not available in cache. Returns: Optional[ProductEntity]: Product entity object or None. """ if allow_fetch: return self.get_or_fetch_entity_by_id(entity_id, ["product"]) return self._entities_by_id.get(entity_id)
[docs] def get_version_by_id( self, entity_id: str, allow_fetch: Optional[bool] = True, ) -> Optional["VersionEntity"]: """Get version entity by id. Args: entity_id (str): Version id. allow_fetch (bool): Try to fetch entity from server if is not available in cache. Returns: Optional[VersionEntity]: Version entity object or None. """ if allow_fetch: return self.get_or_fetch_entity_by_id(entity_id, ["version"]) return self._entities_by_id.get(entity_id)
[docs] def get_or_fetch_entity_by_id( self, entity_id: str, entity_types: List["EntityType"], ): """Get or query entity based on it's id and possible entity types. This is a helper function when entity id is known but entity type may have multiple possible options. Args: entity_id (str): Entity id. entity_types (Iterable[str]): Possible entity types that can the id represent. e.g. '["folder", "project"]' """ existing_entity = self._entities_by_id.get(entity_id) if existing_entity is not None: return existing_entity if not entity_types: return None entity_type = None entity_data = None for entity_type in entity_types: if entity_type == "folder": entity_data = self._connection.get_folder_by_id( self.project_name, entity_id, fields=self._get_folder_fields(), own_attributes=True ) elif entity_type == "task": entity_data = self._connection.get_task_by_id( self.project_name, entity_id, fields=self._get_task_fields(), own_attributes=True ) elif entity_type == "product": entity_data = self._connection.get_product_by_id( self.project_name, entity_id, fields=self._get_product_fields(), ) elif entity_type == "version": entity_data = self._connection.get_version_by_id( self.project_name, entity_id, fields=self._get_version_fields(), ) else: raise ValueError( "Unknown entity type \"{}\"".format(entity_type) ) if entity_data: break if not entity_data: return None if entity_type == "folder": folder_entity = self.add_folder(entity_data) folder_entity.has_published_content = entity_data["hasProducts"] return folder_entity elif entity_type == "task": return self.add_task(entity_data) elif entity_type == "product": return self.add_product(entity_data) elif entity_type == "version": return self.add_version(entity_data) return None
[docs] def get_or_query_entity_by_id( self, entity_id: str, entity_types: List["EntityType"], ): warnings.warn( "Method 'get_or_query_entity_by_id' is deprecated. " "Please use 'get_or_fetch_entity_by_id' instead.", DeprecationWarning ) return self.get_or_fetch_entity_by_id(entity_id, entity_types)
@property def entities(self): """Iterator over available entities. Returns: Iterator[BaseEntity]: All queried/created entities cached in hub. """ for entity in self._entities_by_id.values(): yield entity
[docs] def add_new_folder( self, name: str, folder_type: str, parent_id: Optional[str] = UNKNOWN_VALUE, label: Optional[str] = None, path: Optional[str] = None, status: Optional[str] = UNKNOWN_VALUE, tags: Optional[List[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, thumbnail_id: Optional[str] = UNKNOWN_VALUE, active: bool = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = True, ): """Create folder object and add it to entity hub. Args: name (str): Name of entity. folder_type (str): Type of folder. Folder type must be available in config of project folder types. parent_id (Union[str, None]): Id of parent entity. label (Optional[str]): Folder label. path (Optional[str]): Folder path. Path consist of all parent names with slash('/') used as separator. status (Optional[str]): Folder status. tags (Optional[List[str]]): Folder tags. attribs (Dict[str, Any]): Attribute values. data (Dict[str, Any]): Entity data (custom data). thumbnail_id (Union[str, None]): Id of entity's thumbnail. active (bool): Is entity active. entity_id (Optional[str]): Id of the entity. New id is created if not passed. created (Optional[bool]): Entity is new. When 'None' is passed the value is defined based on value of 'entity_id'. Returns: FolderEntity: Added folder entity. """ folder_entity = FolderEntity( name=name, folder_type=folder_type, parent_id=parent_id, label=label, path=path, status=status, tags=tags, attribs=attribs, data=data, thumbnail_id=thumbnail_id, active=active, entity_id=entity_id, created=created, entity_hub=self ) self.add_entity(folder_entity) return folder_entity
[docs] def add_new_task( self, name: str, task_type: str, folder_id: Optional[str] = UNKNOWN_VALUE, label: Optional[str] = None, status: Optional[str] = UNKNOWN_VALUE, tags: Optional[Iterable[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, assignees: Optional[Iterable[str]] = None, thumbnail_id: Optional[str] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = True, parent_id: Optional[str] = UNKNOWN_VALUE, ): """Create task object and add it to entity hub. Args: name (str): Name of entity. task_type (str): Type of task. Task type must be available in config of project task types. folder_id (Union[str, None]): Parent folder id. label (Optional[str]): Task label. status (Optional[str]): Task status. tags (Optional[Iterable[str]]): Folder tags. attribs (Dict[str, Any]): Attribute values. data (Dict[str, Any]): Entity data (custom data). assignees (Optional[Iterable[str]]): User assignees to the task. thumbnail_id (Union[str, None]): Id of entity's thumbnail. active (bool): Is entity active. entity_id (Optional[str]): Id of the entity. New id is created if not passed. created (Optional[bool]): Entity is new. When 'None' is passed the value is defined based on value of 'entity_id'. parent_id (Union[str, None]): DEPRECATED Parent folder id. Returns: TaskEntity: Added task entity. """ if parent_id is not UNKNOWN_VALUE: warnings.warn( "Used deprecated argument 'parent_id'." " Use 'folder_id' instead.", DeprecationWarning ) folder_id = parent_id task_entity = TaskEntity( name=name, task_type=task_type, folder_id=folder_id, label=label, status=status, tags=tags, attribs=attribs, data=data, assignees=assignees, thumbnail_id=thumbnail_id, active=active, entity_id=entity_id, created=created, entity_hub=self, ) self.add_entity(task_entity) return task_entity
[docs] def add_new_product( self, name: str, product_type: str, folder_id: Optional["Union[str, _CustomNone]"] = UNKNOWN_VALUE, tags: Optional[Iterable[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = True, ): """Create task object and add it to entity hub. Args: name (str): Name of entity. product_type (str): Type of product. folder_id (Union[str, None]): Parent folder id. tags (Optional[Iterable[str]]): Folder tags. attribs (Dict[str, Any]): Attribute values. data (Dict[str, Any]): Entity data (custom data). active (bool): Is entity active. entity_id (Optional[str]): Id of the entity. New id is created if not passed. created (Optional[bool]): Entity is new. When 'None' is passed the value is defined based on value of 'entity_id'. Returns: ProductEntity: Added product entity. """ product_entity = ProductEntity( name=name, product_type=product_type, folder_id=folder_id, tags=tags, attribs=attribs, data=data, active=active, entity_id=entity_id, created=created, entity_hub=self, ) self.add_entity(product_entity) return product_entity
[docs] def add_new_version( self, version: int, product_id: Optional["Union[str, _CustomNone]"] = UNKNOWN_VALUE, task_id: Optional["Union[str, _CustomNone]"] = UNKNOWN_VALUE, status: Optional[str] = UNKNOWN_VALUE, tags: Optional[Iterable[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, thumbnail_id: Optional[str] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = True, ): """Create task object and add it to entity hub. Args: version (int): Version. product_id (Union[str, None]): Parent product id. task_id (Union[str, None]): Parent task id. status (Optional[str]): Task status. tags (Optional[Iterable[str]]): Folder tags. attribs (Dict[str, Any]): Attribute values. data (Dict[str, Any]): Entity data (custom data). thumbnail_id (Union[str, None]): Id of entity's thumbnail. active (bool): Is entity active. entity_id (Optional[str]): Id of the entity. New id is created if not passed. created (Optional[bool]): Entity is new. When 'None' is passed the value is defined based on value of 'entity_id'. Returns: VersionEntity: Added version entity. """ version_entity = VersionEntity( version=version, product_id=product_id, task_id=task_id, status=status, tags=tags, attribs=attribs, data=data, thumbnail_id=thumbnail_id, active=active, entity_id=entity_id, created=created, entity_hub=self, ) self.add_entity(version_entity) return version_entity
[docs] def add_folder(self, folder): """Create folder object and add it to entity hub. Args: folder (Dict[str, Any]): Folder entity data. Returns: FolderEntity: Added folder entity. """ folder_entity = FolderEntity.from_entity_data(folder, entity_hub=self) self.add_entity(folder_entity) return folder_entity
[docs] def add_task(self, task): """Create task object and add it to entity hub. Args: task (Dict[str, Any]): Task entity data. Returns: TaskEntity: Added task entity. """ task_entity = TaskEntity.from_entity_data(task, entity_hub=self) self.add_entity(task_entity) return task_entity
[docs] def add_product(self, product): """Create version object and add it to entity hub. Args: product (Dict[str, Any]): Version entity data. Returns: ProductEntity: Added version entity. """ product_entity = ProductEntity.from_entity_data( product, entity_hub=self ) self.add_entity(product_entity) return product_entity
[docs] def add_version(self, version): """Create version object and add it to entity hub. Args: version (Dict[str, Any]): Version entity data. Returns: VersionEntity: Added version entity. """ version_entity = VersionEntity.from_entity_data( version, entity_hub=self ) self.add_entity(version_entity) return version_entity
[docs] def add_entity(self, entity): """Add entity to hub cache. Args: entity (BaseEntity): Entity that should be added to hub's cache. """ self._entities_by_id[entity.id] = entity parent_children = self._entities_by_parent_id[entity.parent_id] if entity not in parent_children: parent_children.append(entity) if entity.parent_id is PROJECT_PARENT_ID: return parent = self._entities_by_id.get(entity.parent_id) if parent is not None: parent.add_child(entity.id)
[docs] def folder_path_reseted(self, folder_id): """Method called from 'FolderEntity' on path reset. This should reset cache of folder paths on all children entities. The path cache is always propagated from top to bottom so if an entity has not cached path it means that any children can't have it cached. """ if self._path_reset_queue is not None: self._path_reset_queue.append(folder_id) return self._path_reset_queue = collections.deque() self._path_reset_queue.append(folder_id) while self._path_reset_queue: children = self._entities_by_parent_id[folder_id] for child in children: # Get child path but don't trigger cache path = child.get_path(False) if path is not None: # Reset it's path cache if is set child.reset_path() else: self._path_reset_queue.append(child.id) self._path_reset_queue = None
[docs] def unset_entity_parent(self, entity_id, parent_id): entity = self._entities_by_id.get(entity_id) parent = self._entities_by_id.get(parent_id) children_ids = UNKNOWN_VALUE if parent is not None: children_ids = parent.get_children_ids(False) has_set_parent = False if entity is not None: has_set_parent = entity.parent_id == parent_id new_parent_id = None if has_set_parent: entity.parent_id = new_parent_id if children_ids is not UNKNOWN_VALUE and entity_id in children_ids: parent.remove_child(entity_id) if entity is None or not has_set_parent: self.reset_immutable_for_hierarchy_cache(parent_id) return orig_parent_children = self._entities_by_parent_id[parent_id] if entity in orig_parent_children: orig_parent_children.remove(entity) new_parent_children = self._entities_by_parent_id[new_parent_id] if entity not in new_parent_children: new_parent_children.append(entity) self.reset_immutable_for_hierarchy_cache(parent_id)
[docs] def set_entity_parent(self, entity_id, parent_id, orig_parent_id=_NOT_SET): parent = self._entities_by_id.get(parent_id) entity = self._entities_by_id.get(entity_id) if entity is None: if parent is not None: children_ids = parent.get_children_ids(False) if ( children_ids is not UNKNOWN_VALUE and entity_id in children_ids ): parent.remove_child(entity_id) self.reset_immutable_for_hierarchy_cache(parent.id) return if orig_parent_id is _NOT_SET: orig_parent_id = entity.parent_id if orig_parent_id == parent_id: return orig_parent_children = self._entities_by_parent_id[orig_parent_id] if entity in orig_parent_children: orig_parent_children.remove(entity) self.reset_immutable_for_hierarchy_cache(orig_parent_id) orig_parent = self._entities_by_id.get(orig_parent_id) if orig_parent is not None: orig_parent.remove_child(entity_id) parent_children = self._entities_by_parent_id[parent_id] if entity not in parent_children: parent_children.append(entity) entity.parent_id = parent_id if parent is None or parent.get_children_ids(False) is UNKNOWN_VALUE: return parent.add_child(entity_id) self.reset_immutable_for_hierarchy_cache(parent_id)
def _fetch_entity_children(self, entity): folder_fields = self._get_folder_fields() task_fields = self._get_task_fields() tasks = [] folders = [] if entity.entity_type == "project": folders = list(self._connection.get_folders( entity["name"], parent_ids=[entity.id], fields=folder_fields, own_attributes=True, )) elif entity.entity_type == "folder": folders = list(self._connection.get_folders( self.project_entity["name"], parent_ids=[entity.id], fields=folder_fields, own_attributes=True, )) tasks = list(self._connection.get_tasks( self.project_entity["name"], folder_ids=[entity.id], fields=task_fields, own_attributes=True, )) children_ids = { child.id for child in self._entities_by_parent_id[entity.id] } for folder in folders: folder_entity = self._entities_by_id.get(folder["id"]) if folder_entity is None: folder_entity = self.add_folder(folder) children_ids.add(folder_entity.id) elif folder_entity.parent_id == entity.id: children_ids.add(folder_entity.id) folder_entity.has_published_content = folder["hasProducts"] for task in tasks: task_entity = self._entities_by_id.get(task["id"]) if task_entity is not None: if task_entity.parent_id == entity.id: children_ids.add(task_entity.id) continue task_entity = self.add_task(task) children_ids.add(task_entity.id) entity.fill_children_ids(children_ids)
[docs] def get_entity_children(self, entity, allow_fetch=True): children_ids = entity.get_children_ids(allow_fetch=False) if children_ids is not UNKNOWN_VALUE: return entity.get_children() if children_ids is UNKNOWN_VALUE and not allow_fetch: return UNKNOWN_VALUE self._fetch_entity_children(entity) return entity.get_children()
[docs] def delete_entity(self, entity): parent_id = entity.parent_id if parent_id is None: return parent = self._entities_by_id.get(parent_id) if parent is not None: parent.remove_child(entity.id) else: self.unset_entity_parent(entity.id, parent_id)
[docs] def reset_immutable_for_hierarchy_cache( self, entity_id: Optional[str], bottom_to_top: Optional[bool] = True ): if bottom_to_top is None or entity_id is None: return reset_queue = collections.deque() reset_queue.append(entity_id) if bottom_to_top: while reset_queue: entity_id: str = reset_queue.popleft() entity: Optional["BaseEntity"] = self.get_entity_by_id( entity_id ) if entity is None: continue entity.reset_immutable_for_hierarchy_cache(None) reset_queue.append(entity.parent_id) else: while reset_queue: entity_id: str = reset_queue.popleft() entity: Optional["BaseEntity"] = self.get_entity_by_id( entity_id ) if entity is None: continue entity.reset_immutable_for_hierarchy_cache(None) for child in self._entities_by_parent_id[entity.id]: reset_queue.append(child.id)
[docs] def fill_project_from_server(self): """Query project data from server and create project entity. This method will invalidate previous object of Project entity. Returns: ProjectEntity: Entity that was updated with server data. Raises: ValueError: When project was not found on server. """ project_name = self.project_name project = self._connection.get_project( project_name, own_attributes=True ) if not project: raise ValueError( "Project \"{}\" was not found.".format(project_name) ) major, minor, _, _, _ = self._connection.get_server_version_tuple() status_scope_supported = True if (major, minor) < (1, 5): status_scope_supported = False self._project_entity = ProjectEntity.from_entity_data( project, self ) self._project_entity.set_status_scope_supported( status_scope_supported ) self.add_entity(self._project_entity) return self._project_entity
def _get_folder_fields(self) -> Set[str]: folder_fields = set( self._connection.get_default_fields_for_type("folder") ) folder_fields.add("hasProducts") if self._allow_data_changes: folder_fields.add("data") return folder_fields def _get_task_fields(self) -> Set[str]: return set( self._connection.get_default_fields_for_type("task") ) def _get_product_fields(self) -> Set[str]: return set( self._connection.get_default_fields_for_type("product") ) def _get_version_fields(self) -> Set[str]: return set( self._connection.get_default_fields_for_type("version") )
[docs] def fetch_hierarchy_entities(self): """Query whole project at once.""" project_entity = self.fill_project_from_server() folder_fields = self._get_folder_fields() task_fields = self._get_task_fields() folders = self._connection.get_folders( project_entity.name, fields=folder_fields, own_attributes=True, ) tasks = self._connection.get_tasks( project_entity.name, fields=task_fields, own_attributes=True, ) folders_by_parent_id = collections.defaultdict(list) for folder in folders: parent_id = folder["parentId"] folders_by_parent_id[parent_id].append(folder) tasks_by_parent_id = collections.defaultdict(list) for task in tasks: parent_id = task["folderId"] tasks_by_parent_id[parent_id].append(task) lock_queue = collections.deque() hierarchy_queue = collections.deque() hierarchy_queue.append((None, project_entity)) while hierarchy_queue: item = hierarchy_queue.popleft() parent_id, parent_entity = item lock_queue.append(parent_entity) children_ids = set() for folder in folders_by_parent_id[parent_id]: folder_entity = self.add_folder(folder) children_ids.add(folder_entity.id) folder_entity.has_published_content = folder["hasProducts"] hierarchy_queue.append((folder_entity.id, folder_entity)) for task in tasks_by_parent_id[parent_id]: task_entity = self.add_task(task) lock_queue.append(task_entity) children_ids.add(task_entity.id) parent_entity.fill_children_ids(children_ids) # Lock entities when all are added to hub # - lock only entities added in this method while lock_queue: entity = lock_queue.popleft() entity.lock()
[docs] def query_entities_from_server(self): warnings.warn( "Method 'query_entities_from_server' is deprecated." " Please use 'fetch_hierarchy_entities' instead.", DeprecationWarning ) return self.fetch_hierarchy_entities()
[docs] def lock(self): if self._project_entity is None: return for entity in self._entities_by_id.values(): entity.lock()
def _get_top_entities(self): all_ids = set(self._entities_by_id.keys()) return [ entity for entity in self._entities_by_id.values() if entity.parent_id not in all_ids ] def _split_entities(self): top_entities = self._get_top_entities() entities_queue = collections.deque(top_entities) removed_entity_ids = [] created_entity_ids = [] other_entity_ids = [] while entities_queue: entity = entities_queue.popleft() removed = entity.removed if removed: removed_entity_ids.append(entity.id) elif entity.created: created_entity_ids.append(entity.id) else: other_entity_ids.append(entity.id) for child in tuple(self._entities_by_parent_id[entity.id]): if removed: self.unset_entity_parent(child.id, entity.id) entities_queue.append(child) return created_entity_ids, other_entity_ids, removed_entity_ids def _get_update_body(self, entity, changes=None): if changes is None: changes = entity.changes if not changes: return None return { "type": "update", "entityType": entity.entity_type, "entityId": entity.id, "data": changes } def _get_create_body(self, entity): return { "type": "create", "entityType": entity.entity_type, "entityId": entity.id, "data": entity.to_create_body_data() } def _get_delete_body(self, entity): return { "type": "delete", "entityType": entity.entity_type, "entityId": entity.id } def _pre_commit_types_changes( self, project_changes, orig_types, changes_key, post_changes ): """Compare changes of types on a project. Compare old and new types. Change project changes content if some old types were removed. In that case the final change of types will happen when all other entities have changed. Args: project_changes (dict[str, Any]): Project changes. orig_types (list[dict[str, Any]]): Original types. changes_key (Literal["folderTypes", "taskTypes"]): Key of type changes in project changes. post_changes (dict[str, Any]): An object where post changes will be stored. """ if changes_key not in project_changes: return new_types = project_changes[changes_key] orig_types_by_name = { type_info["name"]: type_info for type_info in orig_types } new_names = { type_info["name"] for type_info in new_types } diff_names = set(orig_types_by_name) - new_names if not diff_names: return # Create copy of folder type changes to post changes # - post changes will be commited at the end post_changes[changes_key] = copy.deepcopy(new_types) for type_name in diff_names: new_types.append(orig_types_by_name[type_name]) def _pre_commit_project(self): """Some project changes cannot be committed before hierarchy changes. It is not possible to change folder types or task types if there are existing hierarchy items using the removed types. For that purposes is first committed union of all old and new types and post changes are prepared when all existing entities are changed. Returns: dict[str, Any]: Changes that will be committed after hierarchy changes. """ project_changes = self.project_entity.changes post_changes = {} if not project_changes: return post_changes self._pre_commit_types_changes( project_changes, self.project_entity.get_orig_folder_types(), "folderType", post_changes ) self._pre_commit_types_changes( project_changes, self.project_entity.get_orig_task_types(), "taskType", post_changes ) self._connection.update_project(self.project_name, **project_changes) return post_changes
[docs] def commit_changes(self): """Commit any changes that happened on entities. Todo: Use Operations Session instead of known operations body. """ post_project_changes = self._pre_commit_project() self.project_entity.lock() project_changes = self.project_entity.changes if project_changes: response = self._connection.patch( "projects/{}".format(self.project_name), **project_changes ) response.raise_for_status() self.project_entity.lock() operations_body = [] created_entity_ids, other_entity_ids, removed_entity_ids = ( self._split_entities() ) processed_ids = set() for entity_id in other_entity_ids: if entity_id in processed_ids: continue entity = self._entities_by_id[entity_id] changes = entity.changes processed_ids.add(entity_id) if not changes: continue bodies = [self._get_update_body(entity, changes)] # Parent was created and was not yet added to operations body parent_queue = collections.deque() parent_queue.append(entity.parent_id) while parent_queue: # Make sure entity's parents are created parent_id = parent_queue.popleft() if ( parent_id is UNKNOWN_VALUE or parent_id in processed_ids or parent_id not in created_entity_ids ): continue parent = self._entities_by_id.get(parent_id) processed_ids.add(parent.id) bodies.append(self._get_create_body(parent)) parent_queue.append(parent.id) operations_body.extend(reversed(bodies)) for entity_id in created_entity_ids: if entity_id in processed_ids: continue entity = self._entities_by_id[entity_id] processed_ids.add(entity_id) operations_body.append(self._get_create_body(entity)) for entity_id in reversed(removed_entity_ids): if entity_id in processed_ids: continue entity = self._entities_by_id.pop(entity_id) parent_children = self._entities_by_parent_id[entity.parent_id] if entity in parent_children: parent_children.remove(entity) if not entity.created: operations_body.append(self._get_delete_body(entity)) self._connection.send_batch_operations( self.project_name, operations_body ) if post_project_changes: self._connection.update_project( self.project_name, **post_project_changes) self.lock()
[docs]class AttributeValue(object): def __init__(self, value): self._value = value self._origin_value = copy.deepcopy(value)
[docs] def get_value(self): return self._value
[docs] def set_value(self, value): self._value = value
value = property(get_value, set_value) @property def changed(self): return self._value != self._origin_value
[docs] def lock(self): self._origin_value = copy.deepcopy(self._value)
[docs]class Attributes(object): """Object representing attribs of entity. Todos: This could be enhanced to know attribute schema and validate values based on the schema. Args: attrib_keys (Iterable[str]): Keys that are available in attribs of the entity. values (Optional[Dict[str, Any]]): Values of attributes. """ def __init__(self, attrib_keys, values=UNKNOWN_VALUE): if values in (UNKNOWN_VALUE, None): values = {} self._attributes = { key: AttributeValue(values.get(key)) for key in attrib_keys } def __contains__(self, key): return key in self._attributes def __getitem__(self, key): return self._attributes[key].value def __setitem__(self, key, value): self._attributes[key].set_value(value) def __iter__(self): for key in self._attributes: yield key
[docs] def keys(self): return self._attributes.keys()
[docs] def values(self): for attribute in self._attributes.values(): yield attribute.value
[docs] def items(self): for key, attribute in self._attributes.items(): yield key, attribute.value
[docs] def get(self, key, default=None): """Get value of attribute. Args: key (str): Attribute name. default (Any): Default value to return when attribute was not found. """ attribute = self._attributes.get(key) if attribute is None: return default return attribute.value
[docs] def set(self, key, value): """Change value of attribute. Args: key (str): Attribute name. value (Any): New value of the attribute. """ self[key] = value
[docs] def get_attribute(self, key): """Access to attribute object. Args: key (str): Name of attribute. Returns: AttributeValue: Object of attribute value. Raises: KeyError: When attribute is not available. """ return self._attributes[key]
[docs] def lock(self): for attribute in self._attributes.values(): attribute.lock()
@property def changes(self): """Attribute value changes. Returns: Dict[str, Any]: Key mapping with new values. """ return { attr_key: attribute.value for attr_key, attribute in self._attributes.items() if attribute.changed }
[docs] def to_dict(self, ignore_none=True): output = {} for key, value in self.items(): if ( value is UNKNOWN_VALUE or (ignore_none and value is None) ): continue output[key] = value return output
[docs]class EntityData(dict): """Wrapper for 'data' key on entity. Data on entity are arbitrary data that are not stored in any deterministic model. It is possible to store any data that can be parsed to json. It is not possible to store 'None' to root key. In that case the key is not stored, and removed if existed on entity. To be able to store 'None' value use nested data structure: .. highlight:: text .. code-block:: text { "sceneInfo": { "description": None, "camera": "camera1" } } """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._orig_data = copy.deepcopy(self)
[docs] def get_changes(self): """Changes in entity data. Removed keys have value set to 'None'. Returns: dict[str, Any]: Key mapping with changed values. """ keys = set(self.keys()) | set(self._orig_data.keys()) output = {} for key in keys: if key not in self: # Key was removed output[key] = None elif key not in self._orig_data: # New value was set output[key] = self[key] elif self[key] != self._orig_data[key]: # Value was changed output[key] = self[key] return output
[docs] def get_new_entity_value(self): """Value of data for new entity. Returns: dict[str, Any]: Data without None values. """ return { key: value for key, value in self.items() # Ignore 'None' values if value is not None }
[docs] def lock(self): """Lock changes of entity data.""" self._orig_data = copy.deepcopy(self)
[docs]class BaseEntity(ABC): """Object representation of entity from server which is capturing changes. All data on created object are expected as "current data" on server entity unless the entity has set 'created' to 'True'. So if new data should be stored to server entity then fill entity with server data first and then change them. Calling 'lock' method will mark entity as "saved" and all changes made on entity are set as "current data" on server. Args: entity_id (Optional[str]): Entity id. New id is created if not passed. parent_id (Optional[str]): Parent entity id. attribs (Optional[Dict[str, Any]]): Attribute values. data (Optional[Dict[str, Any]]): Entity data (custom data). thumbnail_id (Optional[str]): Thumbnail id. active (Optional[bool]): Is entity active. entity_hub (EntityHub): Object of entity hub which created object of the entity. created (Optional[bool]): Entity is new. When 'None' is passed the value is defined based on value of 'entity_id'. """ _supports_name = False _supports_label = False _supports_status = False _supports_tags = False _supports_thumbnail = False def __init__( self, entity_id: Optional[str] = None, parent_id: Optional["Union[str, _CustomNone]"] = UNKNOWN_VALUE, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, created: Optional[bool] = None, entity_hub: EntityHub = None, # Optional arguments name=None, label=None, status: Optional[str] = UNKNOWN_VALUE, tags: Optional[List[str]] = None, thumbnail_id: Optional[str] = UNKNOWN_VALUE, ): if entity_hub is None: raise ValueError("Missing required kwarg 'entity_hub'") self._entity_hub = entity_hub if created is None: created = entity_id is None entity_id = self._prepare_entity_id(entity_id) if data is None: data = EntityData() elif data is not UNKNOWN_VALUE: data = EntityData(data) children_ids = UNKNOWN_VALUE if created: children_ids = set() if not created and parent_id is UNKNOWN_VALUE: raise ValueError("Existing entity is missing parent id.") if tags is None: tags = [] else: tags = list(tags) # These are public without any validation at this moment # may change in future (e.g. name will have regex validation) self._entity_id = entity_id self._parent_id = parent_id self.active = active self._created = created self._attribs = Attributes( self._get_attributes_for_type(self.entity_type), attribs ) self._data = data self._children_ids = children_ids self._orig_parent_id = parent_id self._orig_active = active # Optional only if supported by entity type self._name = name self._label = label self._status = status self._tags = copy.deepcopy(tags) self._thumbnail_id = thumbnail_id self._orig_name = name self._orig_label = label self._orig_status = status self._orig_tags = copy.deepcopy(tags) self._orig_thumbnail_id = thumbnail_id self._immutable_for_hierarchy_cache = None def __repr__(self): return "<{} - {}>".format(self.__class__.__name__, self.id) def __getitem__(self, item): return getattr(self, item) def __setitem__(self, item, value): return setattr(self, item, value) def _prepare_entity_id(self, entity_id: Any) -> str: entity_id = convert_entity_id(entity_id) if entity_id is None: entity_id = create_entity_id() return entity_id @property def id(self) -> str: """Access to entity id under which is entity available on server. Returns: str: Entity id. """ return self._entity_id @property def removed(self) -> bool: return self._parent_id is None @property def orig_parent_id(self): return self._orig_parent_id @property def attribs(self): """Entity attributes based on server configuration. Returns: Attributes: Attributes object handling changes and values of attributes on entity. """ return self._attribs @property def data(self): """Entity custom data that are not stored by any deterministic model. Be aware that 'data' can't be queried using GraphQl and cannot be updated partially. Returns: EntityData: Custom data on entity. """ return self._data @property def project_name(self) -> str: """Quick access to project from entity hub. Returns: str: Name of project under which entity lives. """ return self._entity_hub.project_name @property @abstractmethod def entity_type(self) -> "EntityType": """Entity type corresponding to server. Returns: EntityType: Entity type. """ pass @property @abstractmethod def parent_entity_types(self) -> List[str]: """Entity type corresponding to server. Returns: List[str]: Possible entity types of parent. """ pass @property @abstractmethod def changes(self) -> Optional[Dict[str, Any]]: """Receive entity changes. Returns: Optional[Dict[str, Any]]: All values that have changed on entity. New entity must return None. """ pass
[docs] @classmethod @abstractmethod def from_entity_data( cls, entity_data: Dict[str, Any], entity_hub: EntityHub ) -> "BaseEntity": """Create entity based on queried data from server. Args: entity_data (Dict[str, Any]): Entity data from server. entity_hub (EntityHub): Hub which handle the entity. Returns: BaseEntity: Object of the class. """ pass
[docs] @abstractmethod def to_create_body_data(self) -> Dict[str, Any]: """Convert object of entity to data for server on creation. Returns: Dict[str, Any]: Entity data. """ pass
@property def immutable_for_hierarchy(self) -> bool: """Entity is immutable for hierarchy changes. Hierarchy changes can be considered as change of name or parents. Returns: bool: Entity is immutable for hierarchy changes. """ if self._immutable_for_hierarchy_cache is not None: return self._immutable_for_hierarchy_cache immutable_for_hierarchy = self._immutable_for_hierarchy if immutable_for_hierarchy is not None: self._immutable_for_hierarchy_cache = immutable_for_hierarchy return self._immutable_for_hierarchy_cache for child in self._entity_hub.get_entity_children(self): if child.immutable_for_hierarchy: self._immutable_for_hierarchy_cache = True return self._immutable_for_hierarchy_cache self._immutable_for_hierarchy_cache = False return self._immutable_for_hierarchy_cache @property def _immutable_for_hierarchy(self): """Override this method to define if entity object is immutable. This property was added to define immutable state of Folder entities which is used in property 'immutable_for_hierarchy'. Returns: Optional[bool]: Bool to explicitly telling if is immutable or not otherwise None. """ return None @property def has_cached_immutable_hierarchy(self) -> bool: return self._immutable_for_hierarchy_cache is not None
[docs] def reset_immutable_for_hierarchy_cache( self, bottom_to_top: Optional[bool] = True ): """Clear cache of immutable hierarchy property. This is used when entity changed parent or a child was added. Args: bottom_to_top (bool): Reset cache from top hierarchy to bottom or from bottom hierarchy to top. """ self._immutable_for_hierarchy_cache = None self._entity_hub.reset_immutable_for_hierarchy_cache( self.id, bottom_to_top )
def _get_default_changes(self): """Collect changes of common data on entity. Returns: Dict[str, Any]: Changes on entity. Key and it's new value. """ changes = {} if ( self._entity_hub.allow_data_changes and self._data is not UNKNOWN_VALUE ): data_changes = self._data.get_changes() if data_changes: changes["data"] = data_changes if self._orig_thumbnail_id != self._thumbnail_id: changes["thumbnailId"] = self._thumbnail_id if self._orig_active != self.active: changes["active"] = self.active attrib_changes = self.attribs.changes if attrib_changes: changes["attrib"] = attrib_changes if self._supports_name and self._orig_name != self._name: changes["name"] = self._name if self._supports_label: label = self._get_label_value() if label != self._orig_label: changes["label"] = label if self._supports_status and self._orig_status != self._status: changes["status"] = self._status if self._supports_tags and self._orig_tags != self._tags: changes["tags"] = self._tags return changes def _get_attributes_for_type(self, entity_type): return self._entity_hub.get_attributes_for_type(entity_type)
[docs] def lock(self): """Lock entity as 'saved' so all changes are discarded.""" self._orig_parent_id = self._parent_id self._orig_name = self._name self._orig_thumbnail_id = self._thumbnail_id if isinstance(self._data, EntityData): self._data.lock() self._attribs.lock() self._immutable_for_hierarchy_cache = None self._created = False if self._supports_label: self._orig_label = self._get_label_value() if self._supports_status: self._orig_status = self._status if self._supports_tags: self._orig_tags = copy.deepcopy(self._tags) if self._supports_thumbnail: self._orig_thumbnail_id = self._thumbnail_id
def _get_entity_by_id(self, entity_id): return self._entity_hub.get_entity_by_id(entity_id)
[docs] def get_parent_id(self): """Parent entity id. Returns: Optional[str]: Parent entity id or none if is not set. """ return self._parent_id
[docs] def set_parent_id(self, parent_id): """Change parent by id. Args: parent_id (Optional[str]): Id of new parent for entity. Raises: ValueError: If parent was not found by id. TypeError: If validation of parent does not pass. """ if parent_id != self._parent_id: orig_parent_id = self._parent_id self._parent_id = parent_id self._entity_hub.set_entity_parent( self.id, parent_id, orig_parent_id )
parent_id = property(get_parent_id, set_parent_id)
[docs] def get_parent(self, allow_fetch=True): """Parent entity. Returns: Optional[BaseEntity]: Parent object. """ parent = self._entity_hub.get_entity_by_id(self._parent_id) if parent is not None: return parent if not allow_fetch: return self._parent_id if self._parent_id is UNKNOWN_VALUE: return self._parent_id return self._entity_hub.get_or_fetch_entity_by_id( self._parent_id, self.parent_entity_types )
[docs] def set_parent(self, parent): """Change parent object. Args: parent (BaseEntity): New parent for entity. Raises: TypeError: If validation of parent does not pass. """ parent_id = None if parent is not None: parent_id = parent.id self._entity_hub.set_entity_parent(self.id, parent_id)
parent = property(get_parent, set_parent)
[docs] def get_children_ids(self, allow_fetch=True): """Access to children objects. Todos: Children should be maybe handled by EntityHub instead of entities themselves. That would simplify 'set_entity_parent', 'unset_entity_parent' and other logic related to changing hierarchy. Returns: Union[List[str], Type[UNKNOWN_VALUE]]: Children iterator. """ if self._children_ids is UNKNOWN_VALUE: if not allow_fetch: return self._children_ids self._entity_hub.get_entity_children(self, True) return set(self._children_ids)
children_ids = property(get_children_ids)
[docs] def get_children(self, allow_fetch=True): """Access to children objects. Returns: Union[List[BaseEntity], Type[UNKNOWN_VALUE]]: Children iterator. """ if self._children_ids is UNKNOWN_VALUE: if not allow_fetch: return self._children_ids return self._entity_hub.get_entity_children(self, True) return [ self._entity_hub.get_entity_by_id(children_id) for children_id in self._children_ids ]
children = property(get_children)
[docs] def add_child(self, child): """Add child entity. Args: child (BaseEntity): Child object to add. Raises: TypeError: When child object has invalid type to be children. """ child_id = child if isinstance(child_id, BaseEntity): child_id = child.id if self._children_ids is not UNKNOWN_VALUE: self._children_ids.add(child_id) self._entity_hub.set_entity_parent(child_id, self.id)
[docs] def remove_child(self, child): """Remove child entity. Is ignored if child is not in children. Args: child (Union[str, BaseEntity]): Child object or child id to remove. """ child_id = child if isinstance(child_id, BaseEntity): child_id = child.id if self._children_ids is not UNKNOWN_VALUE: self._children_ids.discard(child_id) self._entity_hub.unset_entity_parent(child_id, self.id)
def get_thumbnail_id(self): """Thumbnail id of entity. Returns: Optional[str]: Thumbnail id or none if is not set. """ return self._thumbnail_id def set_thumbnail_id(self, thumbnail_id): """Change thumbnail id. Args: thumbnail_id (Union[str, None]): Thumbnail id for entity. """ self._thumbnail_id = thumbnail_id thumbnail_id = property(get_thumbnail_id, set_thumbnail_id) @property def created(self): """Entity is new. Returns: bool: Entity is newly created. """ return self._created
[docs] def fill_children_ids(self, children_ids): """Fill children ids on entity. Warning: This is not an api call but is called from entity hub. """ self._children_ids = set(children_ids)
[docs] def get_name(self): if not self._supports_name: raise NotImplementedError( f"Name is not supported for '{self.entity_type}'." ) return self._name
[docs] def set_name(self, name): if not self._supports_name: raise NotImplementedError( f"Name is not supported for '{self.entity_type}'." ) if not isinstance(name, str): raise TypeError("Name must be a string.") self._name = name
name = property(get_name, set_name)
[docs] def get_label(self) -> Optional[str]: if not self._supports_label: raise NotImplementedError( f"Label is not supported for '{self.entity_type}'." ) return self._label
[docs] def set_label(self, label: Optional[str]): if not self._supports_label: raise NotImplementedError( f"Label is not supported for '{self.entity_type}'." ) self._label = label
def _get_label_value(self): """Get label value that will be used for operations. Returns: Optional[str]: Label value. """ label = self._label if not label or self._name == label: return None return label label = property(get_label, set_label)
[docs] def get_thumbnail_id(self): """Thumbnail id of entity. Returns: Optional[str]: Thumbnail id or none if is not set. """ if not self._supports_thumbnail: raise NotImplementedError( f"Thumbnail is not supported for '{self.entity_type}'." ) return self._thumbnail_id
[docs] def set_thumbnail_id(self, thumbnail_id): """Change thumbnail id. Args: thumbnail_id (Union[str, None]): Thumbnail id for entity. """ if not self._supports_thumbnail: raise NotImplementedError( f"Thumbnail is not supported for '{self.entity_type}'." ) self._thumbnail_id = thumbnail_id
thumbnail_id = property(get_thumbnail_id, set_thumbnail_id)
[docs] def get_status(self) -> "Union[str, _CustomNone]": """Folder status. Returns: Union[str, UNKNOWN_VALUE]: Folder status or 'UNKNOWN_VALUE'. """ if not self._supports_status: raise NotImplementedError( f"Status is not supported for '{self.entity_type}'." ) return self._status
[docs] def set_status(self, status_name: str): """Set folder status. Args: status_name (str): Status name. """ if not self._supports_status: raise NotImplementedError( f"Status is not supported for '{self.entity_type}'." ) project_entity = self._entity_hub.project_entity status = project_entity.get_status_by_slugified_name(status_name) if status is None: raise ValueError( f"Status {status_name} is not available on project." ) if not status.is_available_for_entity_type(self.entity_type): raise ValueError( f"Status {status_name} is not available for folder." ) self._status = status_name
status = property(get_status, set_status)
[docs] def get_tags(self): """Task tags. Returns: list[str]: Task tags. """ if not self._supports_tags: raise NotImplementedError( f"Tags are not supported for '{self.entity_type}'." ) return self._tags
[docs] def set_tags(self, tags): """Change tags. Args: tags (Iterable[str]): Tags. """ if not self._supports_tags: raise NotImplementedError( f"Tags are not supported for '{self.entity_type}'." ) self._tags = list(tags)
tags = property(get_tags, set_tags)
[docs]class ProjectStatus: """Project status class. Args: name (str): Name of the status. e.g. 'In progress' short_name (Optional[str]): Short name of the status. e.g. 'IP' state (Optional[StatusState]): A state of the status. icon (Optional[str]): Icon of the status. e.g. 'play_arrow'. color (Optional[str]): Color of the status. e.g. '#eeeeee'. scope (Optional[Iterable[str]]): Scope of the status. e.g. ['folder']. index (Optional[int]): Index of the status. project_statuses (Optional[_ProjectStatuses]): Project statuses wrapper. """ valid_states = {"not_started", "in_progress", "done", "blocked"} valid_scope = { "folder", "task", "product", "version", "representation", "workfile" } color_regex = re.compile(r"#([a-f0-9]{6})$") default_state = "in_progress" default_color = "#eeeeee" def __init__( self, name, short_name=None, state=None, icon=None, color=None, scope=None, index=None, project_statuses=None, is_new=None, ): short_name = short_name or "" icon = icon or "" state = state or self.default_state color = color or self.default_color if scope is None: scope = self.valid_scope scope = set(scope) self._name = name self._short_name = short_name self._icon = icon self._slugified_name = None self._state = None self._color = None self._scope = scope self.set_state(state) self.set_color(color) self._original_name = name self._original_short_name = short_name self._original_icon = icon self._original_state = state self._original_color = color self._original_scope = set(scope) self._original_index = index self._index = index self._project_statuses = project_statuses if is_new is None: is_new = index is None or project_statuses is None self._is_new = is_new def __str__(self): short_name = "" if self.short_name: short_name = "({})".format(self.short_name) return "<{} {}{}>".format( self.__class__.__name__, self.name, short_name ) def __repr__(self): return str(self) def __getitem__(self, key): if key in { "name", "short_name", "icon", "state", "color", "slugified_name" }: return getattr(self, key) raise KeyError(key) def __setitem__(self, key, value): if key in {"name", "short_name", "icon", "state", "color"}: return setattr(self, key, value) raise KeyError(key)
[docs] def lock(self): """Lock status. Changes were commited and current values are now the original values. """ self._is_new = False self._original_name = self.name self._original_short_name = self.short_name self._original_icon = self.icon self._original_state = self.state self._original_color = self.color self._original_scope = self.scope self._original_index = self.index
[docs] def is_available_for_entity_type(self, entity_type): if self._scope is None: return True return entity_type in self._scope
[docs] @staticmethod def slugify_name(name): """Slugify status name for name comparison. Args: name (str): Name of the status. Returns: str: Slugified name. """ return slugify_string(name.lower())
[docs] def get_project_statuses(self): """Internal logic method. Returns: _ProjectStatuses: Project statuses object. """ return self._project_statuses
[docs] def set_project_statuses(self, project_statuses): """Internal logic method to change parent object. Args: project_statuses (_ProjectStatuses): Project statuses object. """ self._project_statuses = project_statuses
[docs] def unset_project_statuses(self, project_statuses): """Internal logic method to unset parent object. Args: project_statuses (_ProjectStatuses): Project statuses object. """ if self._project_statuses is project_statuses: self._project_statuses = None self._index = None
@property def changed(self): """Status has changed. Returns: bool: Status has changed. """ return ( self._is_new or self._original_name != self._name or self._original_short_name != self._short_name or self._original_index != self._index or self._original_state != self._state or self._original_icon != self._icon or self._original_color != self._color or self._original_scope != self._scope )
[docs] def delete(self): """Remove status from project statuses object.""" if self._project_statuses is not None: self._project_statuses.remove(self)
[docs] def get_index(self): """Get index of status. Returns: Union[int, None]: Index of status or None if status is not under project. """ return self._index
[docs] def set_index(self, index, **kwargs): """Change status index. Returns: Union[int, None]: Index of status or None if status is not under project. """ if kwargs.get("from_parent"): self._index = index else: self._project_statuses.set_status_index(self, index)
[docs] def get_name(self): """Status name. Returns: str: Status name. """ return self._name
[docs] def set_name(self, name): """Change status name. Args: name (str): New status name. """ if not isinstance(name, str): raise TypeError("Name must be a string.") if name == self._name: return self._name = name self._slugified_name = None
[docs] def get_short_name(self): """Status short name 3 letters tops. Returns: str: Status short name. """ return self._short_name
[docs] def set_short_name(self, short_name): """Change status short name. Args: short_name (str): New status short name. 3 letters tops. """ if not isinstance(short_name, str): raise TypeError("Short name must be a string.") self._short_name = short_name
[docs] def get_icon(self): """Name of icon to use for status. Returns: str: Name of the icon. """ return self._icon
[docs] def set_icon(self, icon): """Change status icon name. Args: icon (str): Name of the icon. """ if icon is None: icon = "" if not isinstance(icon, str): raise TypeError("Icon name must be a string.") self._icon = icon
@property def slugified_name(self): """Slugified and lowere status name. Can be used for comparison of existing statuses. e.g. 'In Progress' vs. 'in-progress'. Returns: str: Slugified and lower status name. """ if self._slugified_name is None: self._slugified_name = self.slugify_name(self.name) return self._slugified_name
[docs] def get_state(self): """Get state of project status. Return: StatusState: General state of status. """ return self._state
[docs] def set_state(self, state): """Set color of project status. Args: state (StatusState): General state of status. """ if state not in self.valid_states: raise ValueError("Invalid state '{}'".format(str(state))) self._state = state
[docs] def get_color(self): """Get color of project status. Returns: str: Status color. """ return self._color
[docs] def set_color(self, color): """Set color of project status. Args: color (str): Color in hex format. Example: '#ff0000'. """ if not isinstance(color, str): raise TypeError( "Color must be string got '{}'".format(type(color))) color = color.lower() if self.color_regex.fullmatch(color) is None: raise ValueError("Invalid color value '{}'".format(color)) self._color = color
[docs] def get_scope(self): """Get scope of the status. Returns: Set[str]: Scope of the status. """ return set(self._scope)
[docs] def set_scope(self, scope): """Get scope of the status. Returns: scope (Iterable[str]): Scope of the status. """ if not isinstance(scope, (list, set, tuple)): raise TypeError( f"Scope must be a list, set, tuple. Got '{type(scope)}'." ) scope = set(scope) invalid_entity_types = scope - self.valid_scope if invalid_entity_types: raise ValueError("Invalid scope values '{}'".format( ", ".join(invalid_entity_types) )) self._scope = scope
name = property(get_name, set_name) short_name = property(get_short_name, set_short_name) project_statuses = property(get_project_statuses, set_project_statuses) index = property(get_index, set_index) state = property(get_state, set_state) color = property(get_color, set_color) icon = property(get_icon, set_icon) scope = property(get_scope, set_scope) def _validate_other_p_statuses(self, other): """Validate if other status can be used for move. To be able to work with other status, and position them in relation, they must belong to same existing object of '_ProjectStatuses'. Args: other (ProjectStatus): Other status to validate. """ o_project_statuses = other.project_statuses m_project_statuses = self.project_statuses if o_project_statuses is None and m_project_statuses is None: raise ValueError("Both statuses are not assigned to a project.") missing_status = None if o_project_statuses is None: missing_status = other elif m_project_statuses is None: missing_status = self if missing_status is not None: raise ValueError( "Status '{}' is not assigned to a project.".format( missing_status.name)) if m_project_statuses is not o_project_statuses: raise ValueError( "Statuse are assigned to different projects." " Cannot execute move." )
[docs] def move_before(self, other): """Move status before other status. Args: other (ProjectStatus): Status to move before. """ self._validate_other_p_statuses(other) self._project_statuses.set_status_index(self, other.index)
[docs] def move_after(self, other): """Move status after other status. Args: other (ProjectStatus): Status to move after. """ self._validate_other_p_statuses(other) self._project_statuses.set_status_index(self, other.index + 1)
[docs] def to_data(self): """Convert status to data. Returns: dict[str, str]: Status data. """ output = { "name": self.name, "shortName": self.short_name, "state": self.state, "icon": self.icon, "color": self.color, "scope": list(self._scope), } if ( not self._is_new and self._original_name and self.name != self._original_name ): output["original_name"] = self._original_name return output
[docs] @classmethod def from_data(cls, data, index=None, project_statuses=None): """Create project status from data. Args: data (dict[str, str]): Status data. index (Optional[int]): Status index. project_statuses (Optional[ProjectStatuses]): Project statuses object which wraps the status for a project. """ return cls( data["name"], data.get("shortName", data.get("short_name")), data.get("state"), data.get("icon"), data.get("color"), data.get("scope"), index=index, project_statuses=project_statuses )
class _ProjectStatuses: """Wrapper for project statuses. Supports basic methods to add, change or remove statuses from a project. To add new statuses use 'create' or 'add_status' methods. To change statuses receive them by one of the getter methods and change their values. Todo: Validate if statuses are duplicated. """ def __init__(self, statuses): self._statuses = [ ProjectStatus.from_data(status, idx, self) for idx, status in enumerate(statuses) ] self._scope_supported = False self._orig_status_length = len(self._statuses) self._set_called = False def __len__(self): return len(self._statuses) def __iter__(self): """Iterate over statuses. Yields: ProjectStatus: Project status. """ for status in self._statuses: yield status def create( self, name, short_name=None, state=None, icon=None, color=None, scope=None, ): """Create project status. Args: name (str): Name of the status. e.g. 'In progress' short_name (Optional[str]): Short name of the status. e.g. 'IP' state (Optional[StatusState]): A state of the status. icon (Optional[str]): Icon of the status. e.g. 'play_arrow'. color (Optional[str]): Color of the status. e.g. '#eeeeee'. scope (Optional[List[str]]): Scope of the status. e.g. ['folder']. Returns: ProjectStatus: Created project status. """ status = ProjectStatus( name, short_name, state, icon, color, scope, is_new=True ) self.append(status) return status def set_status_scope_supported(self, supported: bool): self._scope_supported = supported def lock(self): """Lock statuses. Changes were commited and current values are now the original values. """ self._orig_status_length = len(self._statuses) self._set_called = False for status in self._statuses: status.lock() def to_data(self): """Convert to project statuses data.""" output = [ status.to_data() for status in self._statuses ] # Remove scope if is not supported if not self._scope_supported: for item in output: item.pop("scope") return output def set(self, statuses): """Explicitly override statuses. This method does not handle if statuses changed or not. Args: statuses (list[dict[str, str]]): List of statuses data. """ self._set_called = True self._statuses = [ ProjectStatus.from_data(status, idx, self) for idx, status in enumerate(statuses) ] @property def changed(self): """Statuses have changed. Returns: bool: True if statuses changed, False otherwise. """ if self._set_called: return True # Check if status length changed # - when all statuses are removed it is a changed if self._orig_status_length != len(self._statuses): return True # Go through all statuses and check if any of them changed for status in self._statuses: if status.changed: return True return False def get(self, name, default=None): """Get status by name. Args: name (str): Status name. default (Any): Default value of status is not found. Returns: Union[ProjectStatus, Any]: Status or default value. """ return next( ( status for status in self._statuses if status.name == name ), default ) get_status_by_name = get def index(self, status, **kwargs): """Get status index. Args: status (ProjectStatus): Status to get index of. default (Optional[Any]): Default value if status is not found. Returns: Union[int, Any]: Status index. Raises: ValueError: If status is not found and default value is not defined. """ output = next( ( idx for idx, st in enumerate(self._statuses) if st is status ), None ) if output is not None: return output if "default" in kwargs: return kwargs["default"] raise ValueError("Status '{}' not found".format(status.name)) def get_status_by_slugified_name(self, name): """Get status by slugified name. Args: name (str): Status name. Is slugified before search. Returns: Union[ProjectStatus, None]: Status or None if not found. """ slugified_name = ProjectStatus.slugify_name(name) return next( ( status for status in self._statuses if status.slugified_name == slugified_name ), None ) def remove_by_name(self, name, ignore_missing=False): """Remove status by name. Args: name (str): Status name. ignore_missing (Optional[bool]): If True, no error is raised if status is not found. Returns: ProjectStatus: Removed status. """ matching_status = self.get(name) if matching_status is None: if ignore_missing: return raise ValueError( "Status '{}' not found in project".format(name)) return self.remove(matching_status) def remove(self, status, ignore_missing=False): """Remove status. Args: status (ProjectStatus): Status to remove. ignore_missing (Optional[bool]): If True, no error is raised if status is not found. Returns: Union[ProjectStatus, None]: Removed status. """ index = self.index(status, default=None) if index is None: if ignore_missing: return None raise ValueError("Status '{}' not in project".format(status)) return self.pop(index) def pop(self, index): """Remove status by index. Args: index (int): Status index. Returns: ProjectStatus: Removed status. """ status = self._statuses.pop(index) status.unset_project_statuses(self) for st in self._statuses[index:]: st.set_index(st.index - 1, from_parent=True) return status def insert(self, index, status): """Insert status at index. Args: index (int): Status index. status (Union[ProjectStatus, dict[str, str]]): Status to insert. Can be either status object or status data. Returns: ProjectStatus: Inserted status. """ if not isinstance(status, ProjectStatus): status = ProjectStatus.from_data(status) start_index = index end_index = len(self._statuses) + 1 matching_index = self.index(status, default=None) if matching_index is not None: if matching_index == index: status.set_index(index, from_parent=True) return self._statuses.pop(matching_index) if matching_index < index: start_index = matching_index end_index = index + 1 else: end_index -= 1 status.set_project_statuses(self) self._statuses.insert(index, status) for idx, st in enumerate(self._statuses[start_index:end_index]): st.set_index(start_index + idx, from_parent=True) return status def append(self, status): """Add new status to the end of the list. Args: status (Union[ProjectStatus, dict[str, str]]): Status to insert. Can be either status object or status data. Returns: ProjectStatus: Inserted status. """ return self.insert(len(self._statuses), status) def set_status_index(self, status, index): """Set status index. Args: status (ProjectStatus): Status to set index. index (int): New status index. """ return self.insert(index, status)
[docs]class ProjectEntity(BaseEntity): """Entity representing project on AYON server. Args: name (str): Name of entity. project_code (str): Project code. library (bool): Is project library project. folder_types (list[dict[str, Any]]): Folder types definition. task_types (list[dict[str, Any]]): Task types definition. statuses: (list[dict[str, Any]]): Statuses definition. attribs (Optional[Dict[str, Any]]): Attribute values. data (Dict[str, Any]): Entity data (custom data). active (bool): Is entity active. entity_hub (EntityHub): Object of entity hub which created object of the entity. """ _supports_name = True entity_type = "project" parent_entity_types = [] # TODO These are hardcoded but maybe should be used from server??? default_folder_type_icon = "folder" default_task_type_icon = "task_alt" def __init__( self, name: str, project_code: str, library: bool, folder_types: List[Dict[str, Any]], task_types: List[Dict[str, Any]], statuses: List[Dict[str, Any]], attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, entity_hub: EntityHub = None, ): super().__init__( entity_id=name, parent_id=PROJECT_PARENT_ID, attribs=attribs, data=data, active=active, created=False, entity_hub=entity_hub, name=name, ) self._project_code = project_code self._library_project = library self._folder_types = folder_types self._task_types = task_types self._statuses_obj = _ProjectStatuses(statuses) self._orig_project_code = project_code self._orig_library_project = library self._orig_folder_types = copy.deepcopy(folder_types) self._orig_task_types = copy.deepcopy(task_types) self._orig_statuses = copy.deepcopy(statuses) def _prepare_entity_id(self, entity_id): if entity_id != self.project_name: raise ValueError( "Unexpected entity id value \"{}\". Expected \"{}\"".format( entity_id, self.project_name)) return entity_id
[docs] def set_name(self, name): if self._name == name: return raise ValueError("It is not allowed to change project name.")
[docs] def get_parent(self, *args, **kwargs): return None
[docs] def set_parent(self, parent): raise ValueError( "Parent of project cannot be set to {}".format(parent) )
[docs] def set_status_scope_supported(self, supported: bool): self._statuses_obj.set_status_scope_supported(supported)
parent = property(get_parent, set_parent)
[docs] def get_orig_folder_types(self): return copy.deepcopy(self._orig_folder_types)
[docs] def get_folder_types(self): return copy.deepcopy(self._folder_types)
[docs] def set_folder_types(self, folder_types): new_folder_types = [] for folder_type in folder_types: if "icon" not in folder_type: folder_type["icon"] = self.default_folder_type_icon new_folder_types.append(folder_type) self._folder_types = new_folder_types
[docs] def get_orig_task_types(self): return copy.deepcopy(self._orig_task_types)
[docs] def get_task_types(self): return copy.deepcopy(self._task_types)
[docs] def set_task_types(self, task_types): new_task_types = [] for task_type in task_types: if "icon" not in task_type: task_type["icon"] = self.default_task_type_icon new_task_types.append(task_type) self._task_types = new_task_types
[docs] def get_orig_statuses(self): return copy.deepcopy(self._orig_statuses)
[docs] def get_statuses(self): return self._statuses_obj
[docs] def set_statuses(self, statuses): self._statuses_obj.set(statuses)
folder_types = property(get_folder_types, set_folder_types) task_types = property(get_task_types, set_task_types) statuses = property(get_statuses, set_statuses)
[docs] def get_status_by_slugified_name(self, name): """Find status by name. Args: name (str): Status name. Returns: Union[ProjectStatus, None]: Status object or None. """ return self._statuses_obj.get_status_by_slugified_name(name)
[docs] def lock(self): super().lock() self._orig_folder_types = copy.deepcopy(self._folder_types) self._orig_task_types = copy.deepcopy(self._task_types) self._statuses_obj.lock()
@property def changes(self): changes = self._get_default_changes() if self._orig_folder_types != self._folder_types: changes["folderTypes"] = self.get_folder_types() if self._orig_task_types != self._task_types: changes["taskTypes"] = self.get_task_types() if self._statuses_obj.changed: changes["statuses"] = self._statuses_obj.to_data() return changes
[docs] @classmethod def from_entity_data(cls, project, entity_hub) -> "ProjectEntity": return cls( project["name"], project["code"], library=project["library"], folder_types=project["folderTypes"], task_types=project["taskTypes"], statuses=project["statuses"], attribs=project["ownAttrib"], data=project["data"], active=project["active"], entity_hub=entity_hub, )
[docs] def to_create_body_data(self): raise NotImplementedError( "ProjectEntity does not support conversion to entity data" )
[docs]class FolderEntity(BaseEntity): """Entity representing a folder on AYON server. Args: name (str): Name of entity. folder_type (str): Type of folder. Folder type must be available in config of project folder types. parent_id (Union[str, None]): Id of parent entity. label (Optional[str]): Folder label. path (Optional[str]): Folder path. Path consist of all parent names with slash('/') used as separator. status (Optional[str]): Folder status. tags (Optional[List[str]]): Folder tags. attribs (Dict[str, Any]): Attribute values. data (Dict[str, Any]): Entity data (custom data). thumbnail_id (Union[str, None]): Id of entity's thumbnail. active (bool): Is entity active. entity_id (Union[str, None]): Id of the entity. New id is created if not passed. created (Optional[bool]): Entity is new. When 'None' is passed the value is defined based on value of 'entity_id'. entity_hub (EntityHub): Object of entity hub which created object of the entity. """ _supports_name = True _supports_label = True _supports_tags = True _supports_status = True _supports_thumbnail = True entity_type = "folder" parent_entity_types = ["folder", "project"] def __init__( self, name: str, folder_type: str, parent_id: Optional[str] = UNKNOWN_VALUE, label: Optional[str] = None, path: Optional[str] = None, status: Optional[str] = UNKNOWN_VALUE, tags: Optional[List[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, thumbnail_id: Optional[str] = UNKNOWN_VALUE, active: bool = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = None, entity_hub: EntityHub = None, ): super().__init__( entity_id=entity_id, parent_id=parent_id, attribs=attribs, data=data, active=active, created=created, entity_hub=entity_hub, name=name, label=label, tags=tags, status=status, thumbnail_id=thumbnail_id, ) # Autofill project as parent of folder if is not yet set # - this can be guessed only if folder was just created if self.created and self._parent_id is UNKNOWN_VALUE: self._parent_id = self.project_name self._folder_type = folder_type self._orig_folder_type = folder_type # Know if folder has any products # - is used to know if folder allows hierarchy changes self._has_published_content = False self._path = path
[docs] def get_folder_type(self) -> str: return self._folder_type
[docs] def set_folder_type(self, folder_type: str): self._folder_type = folder_type
folder_type = property(get_folder_type, set_folder_type)
[docs] def get_path(self, dynamic_value=True): if not dynamic_value: return self._path if self._path is None: parent = self.parent if parent.entity_type == "folder": parent_path = parent.path path = "/".join([parent_path, self.name]) elif self._entity_hub.path_start_with_slash: path = "/{}".format(self.name) else: path = self.name self._path = path return self._path
[docs] def reset_path(self): self._path = None self._entity_hub.folder_path_reseted(self.id)
path = property(get_path)
[docs] def get_has_published_content(self): return self._has_published_content
[docs] def set_has_published_content(self, has_published_content): if self._has_published_content is has_published_content: return self._has_published_content = has_published_content # Reset immutable cache of parents self._entity_hub.reset_immutable_for_hierarchy_cache(self.id)
has_published_content = property( get_has_published_content, set_has_published_content ) @property def _immutable_for_hierarchy(self): if self.has_published_content: return True return None
[docs] def lock(self): super().lock() self._orig_folder_type = self._folder_type
@property def changes(self): changes = self._get_default_changes() if self._orig_parent_id != self._parent_id: parent_id = self._parent_id if parent_id == self.project_name: parent_id = None changes["parentId"] = parent_id if self._orig_folder_type != self._folder_type: changes["folderType"] = self._folder_type return changes
[docs] @classmethod def from_entity_data(cls, folder, entity_hub) -> "FolderEntity": parent_id = folder["parentId"] if parent_id is None: parent_id = entity_hub.project_entity.id return cls( name=folder["name"], folder_type=folder["folderType"], parent_id=parent_id, label=folder["label"], path=folder["path"], status=folder["status"], tags=folder["tags"], attribs=folder["ownAttrib"], data=folder.get("data"), thumbnail_id=folder["thumbnailId"], active=folder["active"], entity_id=folder["id"], created=False, entity_hub=entity_hub )
[docs] def to_create_body_data(self): parent_id = self._parent_id if parent_id is UNKNOWN_VALUE: raise ValueError("Folder does not have set 'parent_id'") if parent_id == self.project_name: parent_id = None if not self.name or self.name is UNKNOWN_VALUE: raise ValueError("Folder does not have set 'name'") output = { "name": self.name, "folderType": self.folder_type, "parentId": parent_id, } label = self._get_label_value() if label: output["label"] = label attrib = self.attribs.to_dict() if attrib: output["attrib"] = attrib # Add tags only if are available if self.tags: output["tags"] = list(self.tags) if self.status is not UNKNOWN_VALUE: output["status"] = self.status if self.active is not UNKNOWN_VALUE: output["active"] = self.active if self.thumbnail_id is not UNKNOWN_VALUE: output["thumbnailId"] = self.thumbnail_id if ( self._entity_hub.allow_data_changes and self._data is not UNKNOWN_VALUE ): output["data"] = self._data.get_new_entity_value() return output
[docs]class TaskEntity(BaseEntity): """Entity representing a task on AYON server. Args: name (str): Name of entity. task_type (str): Type of task. Task type must be available in config of project task types. folder_id (Union[str, None]): Parent folder id. label (Optional[str]): Task label. status (Optional[str]): Task status. tags (Optional[Iterable[str]]): Folder tags. attribs (Dict[str, Any]): Attribute values. data (Dict[str, Any]): Entity data (custom data). assignees (Optional[Iterable[str]]): User assignees to the task. thumbnail_id (Union[str, None]): Id of entity's thumbnail. active (bool): Is entity active. entity_id (Union[str, None]): Id of the entity. New id is created if not passed. created (Optional[bool]): Entity is new. When 'None' is passed the value is defined based on value of 'entity_id'. entity_hub (EntityHub): Object of entity hub which created object of the entity. """ _supports_name = True _supports_label = True _supports_tags = True _supports_status = True _supports_thumbnail = True entity_type = "task" parent_entity_types = ["folder"] def __init__( self, name: str, task_type: str, folder_id: Optional[str] = UNKNOWN_VALUE, label: Optional[str] = None, status: Optional[str] = UNKNOWN_VALUE, tags: Optional[Iterable[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, assignees: Optional[Iterable[str]] = None, thumbnail_id: Optional[str] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = None, entity_hub: EntityHub = None, ): super().__init__( name=name, parent_id=folder_id, label=label, status=status, tags=tags, attribs=attribs, data=data, thumbnail_id=thumbnail_id, active=active, entity_id=entity_id, created=created, entity_hub=entity_hub, ) if assignees is None: assignees = [] else: assignees = list(assignees) self._task_type = task_type self._assignees = assignees self._orig_task_type = task_type self._orig_assignees = copy.deepcopy(assignees) self._children_ids = set()
[docs] def lock(self): super().lock() self._orig_task_type = self._task_type self._orig_assignees = copy.deepcopy(self._assignees)
[docs] def get_folder_id(self): return self._parent_id
[docs] def set_folder_id(self, folder_id): self.set_parent_id(folder_id)
folder_id = property(get_folder_id, set_folder_id)
[docs] def get_task_type(self) -> str: return self._task_type
[docs] def set_task_type(self, task_type: str): self._task_type = task_type
task_type = property(get_task_type, set_task_type)
[docs] def get_assignees(self): """Task assignees. Returns: list[str]: Task assignees. """ return self._assignees
[docs] def set_assignees(self, assignees): """Change assignees. Args: assignees (Iterable[str]): assignees. """ self._assignees = list(assignees)
assignees = property(get_assignees, set_assignees)
[docs] def add_child(self, child): raise ValueError("Task does not support to add children")
@property def changes(self): changes = self._get_default_changes() if self._orig_parent_id != self._parent_id: changes["folderId"] = self._parent_id if self._orig_task_type != self._task_type: changes["taskType"] = self._task_type if self._orig_assignees != self._assignees: changes["assignees"] = self._assignees return changes
[docs] @classmethod def from_entity_data(cls, task, entity_hub) -> "TaskEntity": return cls( name=task["name"], task_type=task["taskType"], folder_id=task["folderId"], label=task["label"], status=task["status"], tags=task["tags"], attribs=task["ownAttrib"], data=task.get("data"), assignees=task["assignees"], thumbnail_id=task["thumbnailId"], active=task["active"], entity_id=task["id"], created=False, entity_hub=entity_hub )
[docs] def to_create_body_data(self): if self.parent_id is UNKNOWN_VALUE: raise ValueError("Task does not have set 'parent_id'") output = { "name": self.name, "taskType": self.task_type, "folderId": self.parent_id, } label = self._get_label_value() if label: output["label"] = label attrib = self.attribs.to_dict() if attrib: output["attrib"] = attrib if self.active is not UNKNOWN_VALUE: output["active"] = self.active if self.status is not UNKNOWN_VALUE: output["status"] = self.status if self.tags: output["tags"] = self.tags if self.assignees: output["assignees"] = self.assignees if ( self._entity_hub.allow_data_changes and self._data is not UNKNOWN_VALUE ): output["data"] = self._data.get_new_entity_value() return output
[docs]class ProductEntity(BaseEntity): _supports_name = True _supports_tags = True entity_type = "product" parent_entity_types = ["folder"] def __init__( self, name: str, product_type: str, folder_id: Optional["Union[str, _CustomNone]"] = UNKNOWN_VALUE, tags: Optional[Iterable[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = None, entity_hub: EntityHub = None, ): super().__init__( name=name, parent_id=folder_id, tags=tags, attribs=attribs, data=data, created=created, entity_id=entity_id, active=active, entity_hub=entity_hub, ) self._product_type = product_type self._orig_product_type = product_type
[docs] def get_folder_id(self): return self._parent_id
[docs] def set_folder_id(self, folder_id): self.set_parent_id(folder_id)
folder_id = property(get_folder_id, set_folder_id)
[docs] def get_product_type(self): return self._product_type
[docs] def set_product_type(self, product_type): self._product_type = product_type
product_type = property(get_product_type, set_product_type)
[docs] def lock(self): super().lock() self._orig_product_type = self._product_type
@property def changes(self): changes = self._get_default_changes() if self._orig_parent_id != self._parent_id: changes["folderId"] = self._parent_id if self._orig_product_type != self._product_type: changes["productType"] = self._product_type return changes
[docs] @classmethod def from_entity_data(cls, product, entity_hub): return cls( name=product["name"], product_type=product["productType"], folder_id=product["folderId"], tags=product["tags"], attribs=product["attrib"], data=product.get("data"), active=product["active"], entity_id=product["id"], created=False, entity_hub=entity_hub )
[docs] def to_create_body_data(self): if self.parent_id is UNKNOWN_VALUE: raise ValueError("Product does not have set 'folder_id'") output = { "name": self.name, "productType": self.product_type, "folderId": self.parent_id, } attrib = self.attribs.to_dict() if attrib: output["attrib"] = attrib if self.active is not UNKNOWN_VALUE: output["active"] = self.active if self.tags: output["tags"] = self.tags if ( self._entity_hub.allow_data_changes and self._data is not UNKNOWN_VALUE ): output["data"] = self._data.get_new_entity_value() return output
[docs]class VersionEntity(BaseEntity): _supports_tags = True _supports_status = True _supports_thumbnail = True entity_type = "version" parent_entity_types = ["product"] def __init__( self, version: int, product_id: Optional["Union[str, _CustomNone]"] = UNKNOWN_VALUE, task_id: Optional["Union[str, _CustomNone]"] = UNKNOWN_VALUE, status: Optional[str] = UNKNOWN_VALUE, tags: Optional[Iterable[str]] = None, attribs: Optional[Dict[str, Any]] = UNKNOWN_VALUE, data: Optional[Dict[str, Any]] = UNKNOWN_VALUE, thumbnail_id: Optional[str] = UNKNOWN_VALUE, active: Optional[bool] = UNKNOWN_VALUE, entity_id: Optional[str] = None, created: Optional[bool] = None, entity_hub: EntityHub = None, ): super().__init__( parent_id=product_id, status=status, tags=tags, attribs=attribs, data=data, thumbnail_id=thumbnail_id, active=active, entity_id=entity_id, created=created, entity_hub=entity_hub, ) self._version = version self._task_id = task_id self._orig_version = version self._orig_task_id = task_id
[docs] def get_version(self): return self._version
[docs] def set_version(self, version): self._version = version
version = property(get_version, set_version)
[docs] def get_product_id(self): return self._parent_id
[docs] def set_product_id(self, product_id): self.set_parent_id(product_id)
product_id = property(get_product_id, set_product_id)
[docs] def get_task_id(self): return self._task_id
[docs] def set_task_id(self, task_id): self._task_id = task_id
task_id = property(get_task_id, set_task_id)
[docs] def lock(self): super().lock() self._orig_version = self._version self._orig_task_id = self._task_id
@property def changes(self): changes = self._get_default_changes() if self._orig_parent_id != self._parent_id: changes["productId"] = self._parent_id if self._orig_task_id != self._task_id: changes["taskId"] = self._task_id return changes
[docs] @classmethod def from_entity_data(cls, version, entity_hub): return cls( version=version["version"], product_id=version["productId"], task_id=version["taskId"], status=version["status"], tags=version["tags"], attribs=version["attrib"], data=version.get("data"), thumbnail_id=version["thumbnailId"], active=version["active"], entity_id=version["id"], created=False, entity_hub=entity_hub )
[docs] def to_create_body_data(self): if self.parent_id is UNKNOWN_VALUE: raise ValueError("Version does not have set 'product_id'") output = { "version": self.version, "productId": self.parent_id, } task_id = self.task_id if task_id: output["taskId"] = task_id attrib = self.attribs.to_dict() if attrib: output["attrib"] = attrib if self.active is not UNKNOWN_VALUE: output["active"] = self.active if self.tags: output["tags"] = self.tags if self.status: output["status"] = self.status if ( self._entity_hub.allow_data_changes and self._data is not UNKNOWN_VALUE ): output["data"] = self._data.get_new_entity_value() return output