import bpy import os import re import urllib3 import traceback import time import uuid from pprint import pprint from bpy.props import PointerProperty, StringProperty from pathlib import Path from vse_toolbox.file_utils import install_module, norm_str from vse_toolbox.resources.trackers.tracker import Tracker try: gazu = install_module('gazu') except Exception as e: print('Could not install gazu') print(e) LOGIN = None class Kitsu(Tracker): url: StringProperty() login: StringProperty() password: StringProperty(subtype='PASSWORD') def connect(self, url=None, login=None, password=None): '''Connect to kitsu api using provided url, login and password''' global LOGIN urllib3.disable_warnings() if url is None: url = self.url if login is None: login = self.login if password is None: password = self.password if not url: print(f'Kitsu Url: {self.url} is empty') return if login == LOGIN: return if not url.endswith('/api'): url += '/api' print(f'Info: Setting Host for kitsu {url}') gazu.client.set_host(url) if not gazu.client.host_is_up(): print('Error: Kitsu Host is down') try: print(f'Info: Log in to kitsu as {login}') res = gazu.log_in(login, password) LOGIN = login print(f'Info: Sucessfully login to Kitsu as {res["user"]["full_name"]}') return res['user'] except Exception as e: print(f'Error: {traceback.format_exc()}') def get_id(self, data): if isinstance(data, str): if self.is_id(data): return data #return None elif isinstance(data, dict): return data['id'] elif data: # Should be a Class return data.id def is_id(self, id): if not isinstance(id, str): return False try: uuid.UUID(id) return True except ValueError: return False def get_project(self, project=None): if project: project_id = self.get_id(project) if project_id: return project_id return gazu.project.get_project_by_name(project) if os.environ.get('TRACKER_PROJECT_ID'): return os.environ['TRACKER_PROJECT_ID'] elif os.environ.get('TRACKER_PROJECT_NAME'): return os.environ['TRACKER_PROJECT_NAME'] def get_projects(self): return gazu.project.all_open_projects() def get_episode(self, episode, project=None): project = self.get_project(project) if episode: episode_id = self.get_id(episode) if episode_id: return episode_id return gazu.shot.get_episode_by_name(project, episode) if os.environ.get('TRACKER_EPISODE_ID'): return os.environ['TRACKER_EPISODE_ID'] elif os.environ.get('TRACKER_EPISODE_NAME'): return os.environ['TRACKER_EPISODE_NAME'] def get_episodes(self, project): return gazu.shot.all_episodes_for_project(project) def get_task_type(self, task_type=None): task_type_id = self.get_id(task_type) if task_type_id: return task_type_id return gazu.task.get_task_type_by_name(task_type) def get_shot_task_types(self, project=None): project = self.get_project(project) task_types = gazu.task.all_task_types_for_project(project) task_types.sort(key=lambda x: x['priority']) return [t for t in task_types if t['for_entity'].lower() == 'shot'] def get_metadata_types(self, project=None): project = self.get_project(project) metadatas = [] for metadata in gazu.project.all_metadata_descriptors(project): if metadata['name']: metadatas.append(metadata) return metadatas def get_task_status(self, status=None): status_id = self.get_id(status) if status_id: return status_id return gazu.client.fetch_first('task-status', {"short_name": status.lower()}) def get_task_statuses(self, project=None): project = self.get_project(project) return gazu.task.all_task_statuses_for_project(project) def get_asset_types(self, project=None): project = self.get_project(project) return gazu.asset.all_asset_types_for_project(project) def get_sequence(self, sequence, episode=None, project=None): #print(f'get_sequence({sequence=}, {project=})') project = self.get_project(project) sequence_id = self.get_id(sequence) if sequence_id: return sequence_id params = dict(project=project, sequence_name=sequence) if episode: params['episode'] = self.get_id(episode) return gazu.shot.get_sequence_by_name(**params) def get_shot(self, shot, sequence, project=None): #print(f'get_shot({shot=}, {sequence=}, {project=})') project = self.get_project(project) sequence = self.get_sequence(sequence, project) if not sequence: return shot_id = self.get_id(shot) if shot_id: return shot_id return gazu.shot.get_shot_by_name(sequence, shot) def get_asset(self, asset, asset_type=None, project=None): #print('get_asset', "name", name, 'asset_type', asset_type) asset_id = self.get_id(asset) if asset_id: return asset_id project = self.get_project(project) asset_type = self.get_id(asset_type) asset = gazu.asset.get_asset_by_name(project, asset, asset_type) return asset def get_assets(self, project=None): project = self.get_project(project) assets = gazu.asset.all_assets_for_project(project) entity_types = self.get_asset_types(project) entity_types_ids = {e['id']: e['name'] for e in entity_types} for asset_data in assets: asset_data['asset_type'] = entity_types_ids[asset_data.pop('entity_type_id')] return assets def get_last_comment(self, task): task = self.get_id(task) return gazu.task.get_last_comment_for_task(task) def get_task(self, task=None, entity=None): entity = self.get_id(entity) task_type = self.get_task_type(task) task = gazu.task.get_task_by_name(entity, task_type) if not task: return #task = gazu.task.get_task(task['id']) task['last_comment'] = self.get_last_comment(task) return task def new_preview(self, task, comment, preview, set_main_preview=False): #print('new_preview', task, comment, preview, set_main_preview) task = self.get_id(task) comment = self.get_id(comment) preview_data = gazu.task.add_preview( task=task, comment=comment, preview_file_path=preview ) if set_main_preview: #print('----', 'Settings') gazu.task.set_main_preview(preview_data) return preview_data def new_comment(self, task, status=None, comment='', preview=None, set_main_preview=False): #print('new_comment', task, status, comment, preview, set_main_preview) #task = self.get_task(task) #print('Add Comment', status) if status is None: #print(task) status = {'id' : task['task_status_id']} else: status = self.get_task_status(status) comment = gazu.task.add_comment( task=task, task_status=status, comment=comment ) if preview: #logging.info(f'Adding Preview to Kitsu {preview}') preview = self.new_preview( task=task, comment=comment, preview=str(preview), set_main_preview=set_main_preview ) return comment def remove_comment(self, comment): return gazu.task.remove_comment(comment) def download_preview(self, preview_id, filepath): if isinstance(filepath, str): filepath = Path(filepath) if filepath.exists(): return filepath.parent.mkdir(parents=True, exist_ok=True) gazu.files.download_preview_file_thumbnail(preview_id, filepath.as_posix()) def new_task(self, entity, task_type, status=None, project=None): task_type = self.get_task_type(task_type) if status: status = self.get_task_status(status) return gazu.task.new_task(entity, task_type=task_type, task_status=status) def new_sequence(self, sequence, episode=None, project=None): project = self.get_project(project) params = dict(name=sequence, project=project) episode = self.get_episode(episode) if episode: params['episode'] = episode return gazu.shot.new_sequence(**params) def new_shot(self, shot, sequence, nb_frames=None, frame_in=None, frame_out=None, description='', custom_data=None, with_tasks=True, project=None): project = self.get_project(project) sequence = self.get_sequence(sequence) custom_data = custom_data or {} if frame_in is not None: custom_data["frame_in"] = frame_in if frame_out is not None: custom_data["frame_out"] = frame_out params = dict(name=shot, data=custom_data, sequence_id=self.get_id(sequence), description=description) if nb_frames is not None: params["nb_frames"] = nb_frames shot = self.get_shot(shot=shot, sequence=sequence) if not shot: path = f"data/projects/{self.get_id(project)}/shots" shot = gazu.client.post(path, params) if with_tasks: for task_type in self.get_shot_task_types(project=project): self.new_task(shot, task_type) return shot def update_data(self, entity, data, name=None, description=None, frames=None, clear=False): if isinstance(entity, dict): entity_id = entity['id'] else: entity_id = self.get_id(entity) entity = gazu.client.fetch_one('entities', entity_id) if data.get('custom_data'): data['data'] = data.pop('custom_data') if name: entity['name'] = name if description: entity['description'] = description if frames: entity['nb_frames'] = frames if clear or not entity['data']: entity['data'] = data else: entity['data'].update(data) #print('######UPDATE DATA') #pprint(entity) entity_data = gazu.client.put(f"data/entities/{entity_id}", entity) #print() #pprint(entity) return entity_data['data'] def get_casting(self, shot, project=None): project = self.get_project(project) project_id = self.get_id(project) return gazu.casting.get_shot_casting({'id': self.get_id(shot), 'project_id': project_id}) def update_casting(self, shot, casting, clear=True, project=None): project = self.get_project(project) shot_id = self.get_id(shot) norm_casting = [] if clear is False: norm_casting += self.get_casting(shot, project) for asset in casting: if isinstance(asset, dict) and 'asset_id' in asset: # It's an asset instance asset_id = asset['asset_id'] nb_occurences = asset['nb_occurences'] else: # It's an asset asset = self.get_asset(asset) nb_occurences = 1 cast = next((c for c in norm_casting if c['asset_id'] == asset_id), None) if cast: cast['nb_occurences'] += 1 else: norm_casting.append({'asset_id': asset_id, 'nb_occurences': nb_occurences}) return gazu.casting.update_shot_casting(project, shot_id, norm_casting) def draw_prefs(self, layout): layout.prop(self, 'url', text='Url') layout.prop(self, 'login', text='Login') layout.prop(self, 'password', text='Password')