Source code for ayon_api.graphql

import copy
import numbers
from abc import ABC, abstractmethod
from typing import Optional, Iterable

from .exceptions import GraphQlQueryFailed
from .utils import SortOrder

FIELD_VALUE = object()


[docs]def fields_to_dict(fields): output = {} if not fields: return output for field in fields: hierarchy = field.split(".") last = hierarchy.pop(-1) value = output for part in hierarchy: if value is FIELD_VALUE: break if part not in value: value[part] = {} value = value[part] if value is not FIELD_VALUE: value[last] = FIELD_VALUE return output
[docs]class QueryVariable(object): """Object representing single varible used in GraphQlQuery. Variable definition is in GraphQl query header but it's value is used in fields. Args: variable_name (str): Name of variable in query. """ def __init__(self, variable_name): self._variable_name = variable_name self._name = "${}".format(variable_name) @property def name(self): """Name used in field filter.""" return self._name @property def variable_name(self): """Name of variable in query definition.""" return self._variable_name def __hash__(self): return self._name.__hash__() def __str__(self): return self._name def __format__(self, *args, **kwargs): return self._name.__format__(*args, **kwargs)
[docs]class GraphQlQuery: """GraphQl query which can have fields to query. Single use object which can be used only for one query. Object and children objects keep track about paging and progress. Args: name (str): Name of query. """ offset = 2 def __init__(self, name, order=None): self._name = name self._variables = {} self._children = [] self._has_multiple_edge_fields = None self._order = SortOrder.parse_value(order, SortOrder.ascending) @property def indent(self): """Indentation for preparation of query string. Returns: int: Ident spaces. """ return 0 @property def child_indent(self): """Indentation for preparation of query string used by children. Returns: int: Ident spaces for children. """ return self.indent @property def need_query(self): """Still need query from server. Needed for edges which use pagination. Returns: bool: If still need query from server. """ for child in self._children: if child.need_query: return True return False @property def has_multiple_edge_fields(self): if self._has_multiple_edge_fields is None: edge_counter = 0 for child in self._children: edge_counter += child.sum_edge_fields(2) if edge_counter > 1: break self._has_multiple_edge_fields = edge_counter > 1 return self._has_multiple_edge_fields
[docs] def add_variable(self, key, value_type, value=None): """Add variable to query. Args: key (str): Variable name. value_type (str): Type of expected value in variables. This is graphql type e.g. "[String!]", "Int", "Boolean", etc. value (Any): Default value for variable. Can be changed later. Returns: QueryVariable: Created variable object. Raises: KeyError: If variable was already added before. """ if key in self._variables: raise KeyError( "Variable \"{}\" was already set with type {}.".format( key, value_type ) ) variable = QueryVariable(key) self._variables[key] = { "type": value_type, "variable": variable, "value": value } return variable
[docs] def get_variable(self, key): """Variable object. Args: key (str): Variable name added to headers. Returns: QueryVariable: Variable object used in query string. """ return self._variables[key]["variable"]
[docs] def get_variable_value(self, key, default=None): """Get Current value of variable. Args: key (str): Variable name. default (Any): Default value if variable is available. Returns: Any: Variable value. """ variable_item = self._variables.get(key) if variable_item: return variable_item["value"] return default
[docs] def set_variable_value(self, key, value): """Set value for variable. Args: key (str): Variable name under which the value is stored. value (Any): Variable value used in query. Variable is not used if value is 'None'. """ self._variables[key]["value"] = value
[docs] def get_variable_keys(self): """Get all variable keys. Returns: set[str]: Variable keys. """ return set(self._variables.keys())
[docs] def get_variables_values(self): """Calculate variable values used that should be used in query. Variables with value set to 'None' are skipped. Returns: Dict[str, Any]: Variable values by their name. """ output = {} for key, item in self._variables.items(): value = item["value"] if value is not None: output[key] = item["value"] return output
[docs] def add_obj_field(self, field): """Add field object to children. Args: field (BaseGraphQlQueryField): Add field to query children. """ if field in self._children: return self._children.append(field) field.set_parent(self)
[docs] def add_field_with_edges(self, name): """Add field with edges to query. Args: name (str): Field name e.g. 'tasks'. Returns: GraphQlQueryEdgeField: Created field object. """ item = GraphQlQueryEdgeField(name, self, self._order) self.add_obj_field(item) return item
[docs] def add_field(self, name): """Add field to query. Args: name (str): Field name e.g. 'id'. Returns: GraphQlQueryField: Created field object. """ item = GraphQlQueryField(name, self, self._order) self.add_obj_field(item) return item
[docs] def get_field_by_keys( self, keys: Iterable[str] ) -> Optional["BaseGraphQlQueryField"]: keys = list(keys) if not keys: return None key = keys.pop(0) for child in self._children: if child.name == key: return child.get_field_by_keys(keys) return None
[docs] def get_field_by_path( self, path: str ) -> Optional["BaseGraphQlQueryField"]: return self.get_field_by_keys(path.split("/"))
[docs] def calculate_query(self): """Calculate query string which is sent to server. Returns: str: GraphQl string with variables and headers. Raises: ValueError: Query has no fiels. """ if not self._children: raise ValueError("Missing fields to query") variables = [] for item in self._variables.values(): if item["value"] is None: continue variables.append( "{}: {}".format(item["variable"], item["type"]) ) variables_str = "" if variables: variables_str = "({})".format(",".join(variables)) header = "query {}{}".format(self._name, variables_str) output = [] output.append(header + " {") for field in self._children: output.append(field.calculate_query()) output.append("}") return "\n".join(output)
[docs] def parse_result(self, data, output, progress_data): """Parse data from response for output. Output is stored to passed 'output' variable. That's because of paging during which objects must have access to both new and previous values. Args: data (Dict[str, Any]): Data received using calculated query. output (Dict[str, Any]): Where parsed data are stored. progress_data (Dict[str, Any]): Data used for paging. """ if not data: return for child in self._children: child.parse_result(data, output, progress_data)
[docs] def query(self, con): """Do a query from server. Args: con (ServerAPI): Connection to server with 'query' method. Returns: Dict[str, Any]: Parsed output from GraphQl query. """ progress_data = {} output = {} while self.need_query: query_str = self.calculate_query() variables = self.get_variables_values() response = con.query_graphql( query_str, self.get_variables_values() ) if response.errors: raise GraphQlQueryFailed(response.errors, query_str, variables) self.parse_result(response.data["data"], output, progress_data) return output
[docs] def continuous_query(self, con): """Do a query from server. Args: con (ServerAPI): Connection to server with 'query' method. Returns: Dict[str, Any]: Parsed output from GraphQl query. """ progress_data = {} if self.has_multiple_edge_fields: output = {} while self.need_query: query_str = self.calculate_query() variables = self.get_variables_values() response = con.query_graphql(query_str, variables) if response.errors: raise GraphQlQueryFailed( response.errors, query_str, variables ) self.parse_result(response.data["data"], output, progress_data) yield output else: while self.need_query: output = {} query_str = self.calculate_query() variables = self.get_variables_values() response = con.query_graphql(query_str, variables) if response.errors: raise GraphQlQueryFailed( response.errors, query_str, variables ) self.parse_result(response.data["data"], output, progress_data) yield output
[docs]class BaseGraphQlQueryField(ABC): """Field in GraphQl query. Args: name (str): Name of field. parent (Union[BaseGraphQlQueryField, GraphQlQuery]): Parent object of a field. """ def __init__(self, name, parent, order): if isinstance(parent, GraphQlQuery): query_item = parent else: query_item = parent.query_item self._name = name self._parent = parent self._filters = {} self._children = [] # Value is changed on first parse of result self._need_query = True self._query_item = query_item self._path = None self._limit = None self._order = order self._fetched_counter = 0 def __repr__(self): return "<{} {}>".format(self.__class__.__name__, self.path)
[docs] def get_name(self) -> str: return self._name
name = property(get_name)
[docs] def get_field_by_keys(self, keys: Iterable[str]): keys = list(keys) if not keys: return self key = keys.pop(0) for child in self._children: if child.name == key: return child.get_field_by_keys(keys) return None
[docs] def set_limit(self, limit: Optional[int]): self._limit = limit
[docs] def set_order(self, order): order = SortOrder.parse_value(order) if order is None: raise ValueError( f"Got invalid value {order}." f" Expected {SortOrder.ascending} or {SortOrder.descending}" ) self._order = order
[docs] def set_ascending_order(self, enabled=True): self.set_order( SortOrder.ascending if enabled else SortOrder.descending )
[docs] def set_descending_order(self, enabled=True): self.set_ascending_order(not enabled)
[docs] def add_variable(self, key, value_type, value=None): """Add variable to query. Args: key (str): Variable name. value_type (str): Type of expected value in variables. This is graphql type e.g. "[String!]", "Int", "Boolean", etc. value (Any): Default value for variable. Can be changed later. Returns: QueryVariable: Created variable object. Raises: KeyError: If variable was already added before. """ return self._parent.add_variable(key, value_type, value)
[docs] def get_variable(self, key): """Variable object. Args: key (str): Variable name added to headers. Returns: QueryVariable: Variable object used in query string. """ return self._parent.get_variable(key)
@property def need_query(self): """Still need query from server. Needed for edges which use pagination. Look into children values too. Returns: bool: If still need query from server. """ if self._need_query: return True for child in self._children_iter(): if child.need_query: return True return False def _children_iter(self): """Iterate over all children fields of object. Returns: Iterator[BaseGraphQlQueryField]: Children fields. """ for child in self._children: yield child
[docs] def sum_edge_fields(self, max_limit=None): """Check how many edge fields query has. In case there are multiple edge fields or are nested the query can't yield mid cursor results. Args: max_limit (int): Skip rest of counting if counter is bigger then entered number. Returns: int: Counter edge fields """ counter = 0 if isinstance(self, GraphQlQueryEdgeField): counter = 1 for child in self._children_iter(): counter += child.sum_edge_fields(max_limit) if max_limit is not None and counter >= max_limit: break return counter
@property def offset(self): return self._query_item.offset @property def indent(self): return self._parent.child_indent + self.offset @property @abstractmethod def child_indent(self): pass @property def query_item(self): return self._query_item @property @abstractmethod def has_edges(self): pass @property def child_has_edges(self): for child in self._children_iter(): if child.has_edges or child.child_has_edges: return True return False @property def path(self): """Field path for debugging purposes. Returns: str: Field path in query. """ if self._path is None: if isinstance(self._parent, GraphQlQuery): path = self._name else: path = "/".join((self._parent.path, self._name)) self._path = path return self._path
[docs] def reset_cursor(self): for child in self._children_iter(): child.reset_cursor()
[docs] def get_variable_value(self, *args, **kwargs): return self._query_item.get_variable_value(*args, **kwargs)
[docs] def set_variable_value(self, *args, **kwargs): return self._query_item.set_variable_value(*args, **kwargs)
[docs] def set_filter(self, key, value): self._filters[key] = value
[docs] def has_filter(self, key): return key in self._filters
[docs] def remove_filter(self, key): self._filters.pop(key, None)
[docs] def set_parent(self, parent): if self._parent is parent: return self._parent = parent parent.add_obj_field(self)
[docs] def add_obj_field(self, field): if field in self._children: return self._children.append(field) field.set_parent(self)
[docs] def add_field_with_edges(self, name): item = GraphQlQueryEdgeField(name, self, self._order) self.add_obj_field(item) return item
[docs] def add_field(self, name): item = GraphQlQueryField(name, self, self._order) self.add_obj_field(item) return item
def _filter_value_to_str(self, value): if isinstance(value, QueryVariable): if self.get_variable_value(value.variable_name) is None: return None return str(value) if isinstance(value, numbers.Number): return str(value) if isinstance(value, str): return '"{}"'.format(value) if isinstance(value, (list, set, tuple)): return "[{}]".format( ", ".join( self._filter_value_to_str(item) for item in iter(value) ) ) raise TypeError( "Unknown type to convert '{}'".format(str(type(value))) )
[docs] def get_filters(self): """Receive filters for item. By default just use copy of set filters. Returns: Dict[str, Any]: Fields filters. """ return copy.deepcopy(self._filters)
def _filters_to_string(self): filters = self.get_filters() if not filters: return "" filter_items = [] for key, value in filters.items(): string_value = self._filter_value_to_str(value) if string_value is None: continue filter_items.append("{}: {}".format(key, string_value)) if not filter_items: return "" return "({})".format(", ".join(filter_items)) def _fake_children_parse(self): """Mark children as they don't need query.""" for child in self._children_iter(): child.parse_result({}, {}, {})
[docs] @abstractmethod def calculate_query(self): pass
[docs] @abstractmethod def parse_result(self, data, output, progress_data): pass
[docs]class GraphQlQueryField(BaseGraphQlQueryField): has_edges = False @property def child_indent(self): return self.indent
[docs] def parse_result(self, data, output, progress_data): if not isinstance(data, dict): raise TypeError("{} Expected 'dict' type got '{}'".format( self._name, str(type(data)) )) self._need_query = False value = data.get(self._name) if value is None: self._fake_children_parse() if self._name in data: output[self._name] = None return if not self._children: output[self._name] = value return output_value = output.get(self._name) if isinstance(value, dict): if output_value is None: output_value = {} output[self._name] = output_value for child in self._children: child.parse_result(value, output_value, progress_data) return if output_value is None: output_value = [] output[self._name] = output_value if not value: self._fake_children_parse() return diff = len(value) - len(output_value) if diff > 0: for _ in range(diff): output_value.append({}) for idx, item in enumerate(value): item_value = output_value[idx] for child in self._children: child.parse_result(item, item_value, progress_data)
[docs] def calculate_query(self): offset = self.indent * " " header = "{}{}{}".format( offset, self._name, self._filters_to_string() ) if not self._children: return header output = [] output.append(header + " {") output.extend([ field.calculate_query() for field in self._children ]) output.append(offset + "}") return "\n".join(output)
[docs]class GraphQlQueryEdgeField(BaseGraphQlQueryField): has_edges = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._cursor = None self._edge_children = [] @property def child_indent(self): offset = self.offset * 2 return self.indent + offset def _children_iter(self): for child in super()._children_iter(): yield child for child in self._edge_children: yield child
[docs] def add_obj_field(self, field): if field in self._edge_children: return super().add_obj_field(field)
[docs] def add_obj_edge_field(self, field): if field in self._edge_children or field in self._children: return self._edge_children.append(field) field.set_parent(self)
[docs] def add_edge_field(self, name): item = GraphQlQueryField(name, self, self._order) self.add_obj_edge_field(item) return item
[docs] def reset_cursor(self): # Reset cursor only for edges self._cursor = None self._need_query = True super().reset_cursor()
[docs] def parse_result(self, data, output, progress_data): if not isinstance(data, dict): raise TypeError("{} Expected 'dict' type got '{}'".format( self._name, str(type(data)) )) value = data.get(self._name) if value is None: self._fake_children_parse() self._need_query = False return if self._name in output: node_values = output[self._name] else: node_values = [] output[self._name] = node_values handle_cursors = self.child_has_edges if handle_cursors: cursor_key = self._get_cursor_key() if cursor_key in progress_data: nodes_by_cursor = progress_data[cursor_key] else: nodes_by_cursor = {} progress_data[cursor_key] = nodes_by_cursor page_info = value["pageInfo"] new_cursor = page_info["endCursor"] self._need_query = page_info["hasNextPage"] edges = value["edges"] # Fake result parse if not edges: self._fake_children_parse() self._fetched_counter += len(edges) if self._limit and self._fetched_counter >= self._limit: self._need_query = False for edge in edges: if not handle_cursors: edge_value = {} node_values.append(edge_value) else: edge_cursor = edge["cursor"] edge_value = nodes_by_cursor.get(edge_cursor) if edge_value is None: edge_value = {} nodes_by_cursor[edge_cursor] = edge_value node_values.append(edge_value) for child in self._edge_children: child.parse_result(edge, edge_value, progress_data) for child in self._children: child.parse_result(edge["node"], edge_value, progress_data) if not self._need_query: return change_cursor = True for child in self._children_iter(): if child.need_query: change_cursor = False if change_cursor: for child in self._children_iter(): child.reset_cursor() self._cursor = new_cursor
def _get_cursor_key(self): return "{}/__cursor__".format(self.path)
[docs] def get_filters(self): filters = super().get_filters() limit_key = "first" if self._order == SortOrder.descending: limit_key = "last" limit_amount = 300 if self._limit: total = self._fetched_counter + limit_amount if total > self._limit: limit_amount = self._limit - self._fetched_counter filters[limit_key] = limit_amount if self._cursor: filters["after"] = self._cursor return filters
[docs] def calculate_query(self): if not self._children and not self._edge_children: raise ValueError("Missing child definitions for edges {}".format( self.path )) offset = self.indent * " " header = "{}{}{}".format( offset, self._name, self._filters_to_string() ) output = [] output.append(header + " {") edges_offset = offset + self.offset * " " node_offset = edges_offset + self.offset * " " output.append(edges_offset + "edges {") for field in self._edge_children: output.append(field.calculate_query()) if self._children: output.append(node_offset + "node {") for field in self._children: output.append( field.calculate_query() ) output.append(node_offset + "}") if self.child_has_edges: output.append(node_offset + "cursor") output.append(edges_offset + "}") # Add page information output.append(edges_offset + "pageInfo {") for page_key in ( "endCursor", "hasNextPage", ): output.append(node_offset + page_key) output.append(edges_offset + "}") output.append(offset + "}") return "\n".join(output)
INTROSPECTION_QUERY = """ query IntrospectionQuery { __schema { queryType { name } mutationType { name } subscriptionType { name } types { ...FullType } directives { name description locations args { ...InputValue } } } } fragment FullType on __Type { kind name description fields(includeDeprecated: true) { name description args { ...InputValue } type { ...TypeRef } isDeprecated deprecationReason } inputFields { ...InputValue } interfaces { ...TypeRef } enumValues(includeDeprecated: true) { name description isDeprecated deprecationReason } possibleTypes { ...TypeRef } } fragment InputValue on __InputValue { name description type { ...TypeRef } defaultValue } fragment TypeRef on __Type { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name } } } } } } } } """