vse_toolbox/resources/trackers/kitsu.py

454 lines
14 KiB
Python

import bpy
import os
from os.path import expandvars
import re
import urllib3
import traceback
import time
import uuid
from pprint import pprint
from bpy.props import PointerProperty, StringProperty
from pathlib import Path
from vse_toolbox.file_utils import install_module, norm_str
from vse_toolbox.resources.trackers.tracker import Tracker
try:
gazu = install_module('gazu')
except Exception as e:
print('Could not install gazu')
print(e)
LOGIN = None
class Kitsu(Tracker):
url: StringProperty()
login: StringProperty()
password: StringProperty(subtype='PASSWORD')
admin_login : StringProperty()
admin_password : StringProperty(subtype='PASSWORD')
def admin_connect(self):
url = self.url
if not url.endswith('/api'):
url += '/api'
login = expandvars(self.admin_login or self.login)
password = expandvars(self.admin_password or self.password)
try:
res = gazu.log_in(login, password)
LOGIN = login
return res['user']
except Exception as e:
print(e)
def connect(self):
'''Connect to kitsu api using provided url, login and password'''
global LOGIN
urllib3.disable_warnings()
url = expandvars(self.url)
login = expandvars(self.login)
password = expandvars(self.password)
if not url:
print(f'Kitsu Url: {self.url} is empty')
return
if login == LOGIN:
return
if not url.endswith('/api'):
url += '/api'
print(f'Info: Setting Host for kitsu {url}')
gazu.client.set_host(url)
if not gazu.client.host_is_up():
print('Error: Kitsu Host is down')
#try:
print(f'Info: Log in to kitsu as {login}')
res = gazu.log_in(login, password)
LOGIN = login
print(f'Info: successfully login to Kitsu as {res["user"]["full_name"]}')
return res['user']
#except Exception as e:
# print(f'Error: {traceback.format_exc()}')
def get_id(self, data):
if isinstance(data, str):
if self.is_id(data):
return data
#return None
elif isinstance(data, dict):
return data['id']
elif data: # Should be a Class
return data.id
def is_id(self, id):
if not isinstance(id, str):
return False
try:
uuid.UUID(id)
return True
except ValueError:
return False
def get_project(self, project=None):
if project:
project_id = self.get_id(project)
if project_id:
return project_id
return gazu.project.get_project_by_name(project)
if os.environ.get('TRACKER_PROJECT_ID'):
return os.environ['TRACKER_PROJECT_ID']
elif os.environ.get('TRACKER_PROJECT_NAME'):
return os.environ['TRACKER_PROJECT_NAME']
def get_projects(self):
return gazu.project.all_open_projects()
def get_episode(self, episode, project=None):
project = self.get_project(project)
if episode:
episode_id = self.get_id(episode)
if episode_id:
return episode_id
return gazu.shot.get_episode_by_name(project, episode)
if os.environ.get('TRACKER_EPISODE_ID'):
return os.environ['TRACKER_EPISODE_ID']
elif os.environ.get('TRACKER_EPISODE_NAME'):
return os.environ['TRACKER_EPISODE_NAME']
def get_episodes(self, project):
return gazu.shot.all_episodes_for_project(project)
def get_task_type(self, task_type=None):
task_type_id = self.get_id(task_type)
if task_type_id:
return task_type_id
return gazu.task.get_task_type_by_name(task_type)
def get_shot_task_types(self, project=None):
project = self.get_project(project)
task_types = gazu.task.all_task_types_for_project(project)
task_types.sort(key=lambda x: x['priority'])
return [t for t in task_types if t['for_entity'].lower() == 'shot']
def get_metadata_types(self, project=None):
project = self.get_project(project)
try :
metadatas = gazu.project.all_metadata_descriptors(project)
except gazu.exception.NotAllowedException:
print('No autorized to fetch metadata')
return []
return [m for m in metadatas if m['name']]
def get_task_status(self, status=None):
status_id = self.get_id(status)
if status_id:
return status_id
return gazu.client.fetch_first('task-status', {"short_name": status.lower()})
def get_task_statuses(self, project=None):
project = self.get_project(project)
return gazu.task.all_task_statuses_for_project(project)
def get_asset_types(self, project=None):
project = self.get_project(project)
return gazu.asset.all_asset_types_for_project(project)
def get_sequence(self, sequence, episode=None, project=None):
#print(f'get_sequence({sequence=}, {project=})')
project = self.get_project(project)
sequence_id = self.get_id(sequence)
if sequence_id:
return sequence_id
params = dict(project=project, sequence_name=sequence)
if episode:
params['episode'] = self.get_id(episode)
return gazu.shot.get_sequence_by_name(**params)
def get_sequences(self, project=None, episode=None):
#print(f'get_sequence({sequence=}, {project=})')
project = self.get_project(project)
if episode:
episode = self.get_episode(episode)
return gazu.shot.all_sequences_for_episode(episode)
else:
return gazu.shot.all_sequences_for_project(project)
def get_shot(self, shot, sequence, project=None):
#print(f'get_shot({shot=}, {sequence=}, {project=})')
project = self.get_project(project)
sequence = self.get_sequence(sequence, project)
if not sequence:
return
shot_id = self.get_id(shot)
if shot_id:
return shot_id
return gazu.shot.get_shot_by_name(sequence, shot)
def get_shots(self, sequence):
#print(f'get_sequence({sequence=}, {project=})')
#project = self.get_project(project)
return gazu.shot.all_shots_for_sequence(sequence)
def get_asset(self, asset, asset_type=None, project=None):
#print('get_asset', "name", name, 'asset_type', asset_type)
asset_id = self.get_id(asset)
if asset_id:
return asset_id
project = self.get_project(project)
asset_type = self.get_id(asset_type)
asset = gazu.asset.get_asset_by_name(project, asset, asset_type)
return asset
def get_assets(self, project=None):
project = self.get_project(project)
assets = gazu.asset.all_assets_for_project(project)
entity_types = self.get_asset_types(project)
entity_types_ids = {e['id']: e['name'] for e in entity_types}
for asset_data in assets:
asset_data['asset_type'] = entity_types_ids[asset_data.pop('entity_type_id')]
return assets
def get_last_comment(self, task):
task = self.get_id(task)
return gazu.task.get_last_comment_for_task(task)
def get_last_comment_with_preview(self, task):
task = self.get_id(task)
comments = gazu.task.all_comments_for_task(task)
for comment in comments:
if comment['previews']:
return comment
def download_preview_file(self, preview, filepath):
preview_id = self.get_id(preview)
return gazu.files.download_preview_file(preview_id, str(filepath))
def get_shots_search_url(self, shot_names, project=None, episode=None):
project = self.get_project(project)
url = gazu.client.get_host().replace('/api', f'/productions/{project}')
if episode:
episode = self.get_episode(episode)
url += f'/episodes/{episode["id"]}'
url += f'/shots'
return f'{url}?search={" ".join(shot_names)}'
def get_task(self, task=None, entity=None):
entity = self.get_id(entity)
task_type = self.get_task_type(task)
task = gazu.task.get_task_by_name(entity, task_type)
if not task:
return
#task = gazu.task.get_task(task['id'])
task['last_comment'] = self.get_last_comment(task)
return task
def set_main_preview(self, preview_data):
gazu.task.set_main_preview(preview_data)
def new_preview(self, task, comment, preview, set_main_preview=False):
#print('new_preview', task, comment, preview, set_main_preview)
task = self.get_id(task)
comment = self.get_id(comment)
preview_data = gazu.task.add_preview(
task=task,
comment=comment,
preview_file_path=preview )
if set_main_preview:
#print('----', 'Settings')
self.set_main_preview(preview_data)
return preview_data
def new_comment(self, task, status=None, comment='', preview=None, set_main_preview=False):
#print('new_comment', task, status, comment, preview, set_main_preview)
#task = self.get_task(task)
#print('Add Comment', status)
if status is None:
#print(task)
status = {'id' : task['task_status_id']}
else:
status = self.get_task_status(status)
comment = gazu.task.add_comment(
task=task,
task_status=status,
comment=comment )
if preview:
#logging.info(f'Adding Preview to Kitsu {preview}')
preview = self.new_preview(
task=task,
comment=comment,
preview=str(preview),
set_main_preview=set_main_preview )
return comment
def remove_comment(self, comment):
return gazu.task.remove_comment(comment)
def download_preview(self, preview_id, filepath):
if isinstance(filepath, str):
filepath = Path(filepath)
if filepath.exists():
return
filepath.parent.mkdir(parents=True, exist_ok=True)
gazu.files.download_preview_file_thumbnail(preview_id, filepath.as_posix())
def new_task(self, entity, task_type, status=None, project=None):
task_type = self.get_task_type(task_type)
if status:
status = self.get_task_status(status)
return gazu.task.new_task(entity, task_type=task_type, task_status=status)
def new_sequence(self, sequence, episode=None, project=None):
project = self.get_project(project)
params = dict(name=sequence, project=project)
episode = self.get_episode(episode)
if episode:
params['episode'] = episode
return gazu.shot.new_sequence(**params)
def new_shot(self, shot, sequence, nb_frames=None, frame_in=None,
frame_out=None, description='', custom_data=None, with_tasks=True, project=None):
project = self.get_project(project)
sequence = self.get_sequence(sequence)
custom_data = custom_data or {}
if frame_in is not None:
custom_data["frame_in"] = frame_in
if frame_out is not None:
custom_data["frame_out"] = frame_out
params = dict(name=shot, data=custom_data,
sequence_id=self.get_id(sequence), description=description)
if nb_frames is not None:
params["nb_frames"] = nb_frames
shot = self.get_shot(shot=shot, sequence=sequence)
if not shot:
path = f"data/projects/{self.get_id(project)}/shots"
shot = gazu.client.post(path, params)
if with_tasks:
for task_type in self.get_shot_task_types(project=project):
self.new_task(shot, task_type)
return shot
def update_data(self, entity, custom_data={}, name=None, description=None, frames=None, clear=False):
if isinstance(entity, dict):
entity_id = entity['id']
else:
entity_id = self.get_id(entity)
entity = gazu.client.fetch_one('entities', entity_id)
if name:
entity['name'] = name
if description:
entity['description'] = description
if frames:
entity['nb_frames'] = frames
if clear or not entity['data']:
entity['data'] = custom_data
else:
entity['data'].update(custom_data)
#print('######UPDATE DATA')
#pprint(entity)
entity_data = gazu.client.put(f"data/entities/{entity_id}", entity)
#print()
#pprint(entity)
return entity_data['data']
def get_casting(self, shot, project=None):
project = self.get_project(project)
project_id = self.get_id(project)
return gazu.casting.get_shot_casting({'id': self.get_id(shot), 'project_id': project_id})
def update_casting(self, shot, casting, clear=True, project=None):
project = self.get_project(project)
shot_id = self.get_id(shot)
norm_casting = []
if clear is False:
norm_casting += self.get_casting(shot, project)
for asset in casting:
if isinstance(asset, dict) and 'asset_id' in asset: # It's an asset instance
asset_id = asset['asset_id']
nb_occurences = asset['nb_occurences']
else: # It's an asset
asset = self.get_asset(asset)
nb_occurences = 1
cast = next((c for c in norm_casting if c['asset_id'] == asset_id), None)
if cast:
cast['nb_occurences'] += 1
else:
norm_casting.append({'asset_id': asset_id, 'nb_occurences': nb_occurences})
return gazu.casting.update_shot_casting(project, shot_id, norm_casting)
def draw_prefs(self, layout):
col = layout.column(align=False)
col.prop(self, 'url', text='Url')
col.prop(self, 'login', text='Login')
col.prop(self, 'password', text='Password')
col.separator()
col.prop(self, 'admin_login', text='Admin Login')
col.prop(self, 'admin_password', text='Admin Password')