Joseph HENRY 01fccc63d9 fix update_data: PUT only changed fields, not the whole entity
PUTting the full entity dict back to data/entities/{id} sends server-managed
fields (id, created_at, updated_at, project_id, entity_type_id, ...) that
newer Kitsu rejects with ParameterException. Build a partial payload with
only name/description/nb_frames/data — and only fetch/merge existing data
when the caller is actually updating custom_data.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 17:11:05 +02:00

480 lines
16 KiB
Python

import bpy
import os
from os.path import expandvars
import re
import urllib3
import traceback
import time
import uuid
from pprint import pprint
from bpy.props import PointerProperty, StringProperty
from pathlib import Path
from vse_toolbox.file_utils import install_module, norm_str
from vse_toolbox.resources.trackers.tracker import Tracker
try:
gazu = install_module('gazu', package_name='gazu>=1.0,<2.0')
except Exception as e:
print('Could not install gazu')
print(e)
LOGIN = None
class Kitsu(Tracker):
url: StringProperty()
login: StringProperty()
password: StringProperty(subtype='PASSWORD')
admin_login : StringProperty()
admin_password : StringProperty(subtype='PASSWORD')
def admin_connect(self):
url = expandvars(self.url)
if not url:
raise ConnectionError("No Kitsu URL configured")
if not url.endswith('/api'):
url += '/api'
gazu.client.set_host(url)
login = expandvars(self.admin_login or self.login)
password = expandvars(self.admin_password or self.password)
try:
res = gazu.log_in(login, password)
LOGIN = login
return res['user']
except Exception as e:
print(e)
def connect(self):
'''Connect to kitsu api using provided url, login and password'''
global LOGIN
urllib3.disable_warnings()
url = expandvars(self.url)
login = expandvars(self.login)
password = expandvars(self.password)
if not url:
print(f'Kitsu Url: {self.url} is empty')
return
if login == LOGIN:
return
if not url.endswith('/api'):
url += '/api'
print(f'Info: Setting Host for kitsu {url}')
gazu.client.set_host(url)
if not gazu.client.host_is_up():
print('Error: Kitsu Host is down')
#try:
print(f'Info: Log in to kitsu as {login}')
res = gazu.log_in(login, password)
LOGIN = login
print(f'Info: successfully login to Kitsu as {res["user"]["full_name"]}')
return res['user']
#except Exception as e:
# print(f'Error: {traceback.format_exc()}')
def get_id(self, data):
if isinstance(data, str):
if self.is_id(data):
return data
#return None
elif isinstance(data, dict):
return data['id']
elif data: # Should be a Class
return data.id
def is_id(self, id):
if not isinstance(id, str):
return False
try:
uuid.UUID(id)
return True
except ValueError:
return False
def get_project(self, project=None):
if project:
project_id = self.get_id(project)
if project_id:
return project_id
return gazu.project.get_project_by_name(project)
if os.environ.get('TRACKER_PROJECT_ID'):
return os.environ['TRACKER_PROJECT_ID']
elif os.environ.get('TRACKER_PROJECT_NAME'):
return os.environ['TRACKER_PROJECT_NAME']
def get_projects(self):
return gazu.project.all_open_projects()
def get_episode(self, episode, project=None):
project = self.get_project(project)
if episode:
episode_id = self.get_id(episode)
if episode_id:
return episode_id
return gazu.shot.get_episode_by_name(project, episode)
if os.environ.get('TRACKER_EPISODE_ID'):
return os.environ['TRACKER_EPISODE_ID']
elif os.environ.get('TRACKER_EPISODE_NAME'):
return os.environ['TRACKER_EPISODE_NAME']
def get_episodes(self, project):
return gazu.shot.all_episodes_for_project(project)
def get_task_type(self, task_type=None):
task_type_id = self.get_id(task_type)
if task_type_id:
return task_type_id
return gazu.task.get_task_type_by_name(task_type)
def get_shot_task_types(self, project=None):
project = self.get_project(project)
task_types = gazu.task.all_task_types_for_project(project)
task_types.sort(key=lambda x: x['priority'])
return [t for t in task_types if t['for_entity'].lower() == 'shot']
def get_metadata_types(self, project=None):
project = self.get_project(project)
try :
metadatas = gazu.project.all_metadata_descriptors(project)
except gazu.exception.NotAllowedException:
print('No autorized to fetch metadata')
return []
return [m for m in metadatas if m['name']]
def get_task_status(self, status=None):
status_id = self.get_id(status)
if status_id:
return status_id
return gazu.client.fetch_first('task-status', {"short_name": status.lower()})
def get_task_statuses(self, project=None):
project = self.get_project(project)
return gazu.task.all_task_statuses_for_project(project)
def get_asset_types(self, project=None):
project = self.get_project(project)
return gazu.asset.all_asset_types_for_project(project)
def get_sequence(self, sequence, episode=None, project=None):
#print(f'get_sequence({sequence=}, {project=})')
project = self.get_project(project)
sequence_id = self.get_id(sequence)
if sequence_id:
return sequence_id
params = dict(project=project, sequence_name=sequence)
if episode:
params['episode'] = self.get_id(episode)
return gazu.shot.get_sequence_by_name(**params)
def get_sequences(self, project=None, episode=None):
#print(f'get_sequence({sequence=}, {project=})')
project = self.get_project(project)
if episode:
episode = self.get_episode(episode)
return gazu.shot.all_sequences_for_episode(episode)
else:
return gazu.shot.all_sequences_for_project(project)
def get_shot(self, shot, sequence, project=None):
#print(f'get_shot({shot=}, {sequence=}, {project=})')
project = self.get_project(project)
sequence = self.get_sequence(sequence, project)
if not sequence:
return
shot_id = self.get_id(shot)
if shot_id:
return shot_id
return gazu.shot.get_shot_by_name(sequence, shot)
def get_shots(self, sequence):
#print(f'get_sequence({sequence=}, {project=})')
#project = self.get_project(project)
return gazu.shot.all_shots_for_sequence(sequence)
def get_asset(self, asset, asset_type=None, project=None):
#print('get_asset', "name", name, 'asset_type', asset_type)
asset_id = self.get_id(asset)
if asset_id:
return asset_id
project = self.get_project(project)
asset_type = self.get_id(asset_type)
asset = gazu.asset.get_asset_by_name(project, asset, asset_type)
return asset
def get_assets(self, project=None):
project = self.get_project(project)
assets = gazu.asset.all_assets_for_project(project)
entity_types = self.get_asset_types(project)
entity_types_ids = {e['id']: e['name'] for e in entity_types}
for asset_data in assets:
asset_data['asset_type'] = entity_types_ids[asset_data.pop('entity_type_id')]
return assets
def get_last_comment(self, task):
task = self.get_id(task)
return gazu.task.get_last_comment_for_task(task)
def get_last_comment_with_preview(self, task):
task = self.get_id(task)
comments = gazu.task.all_comments_for_task(task)
for comment in comments:
if comment['previews']:
return comment
def download_preview_file(self, preview, filepath):
preview_id = self.get_id(preview)
return gazu.files.download_preview_file(preview_id, str(filepath))
def get_shots_search_url(self, shot_names, project=None, episode=None):
project = self.get_project(project)
url = gazu.client.get_host().replace('/api', f'/productions/{project}')
if episode:
episode = self.get_episode(episode)
url += f'/episodes/{episode["id"]}'
url += f'/shots'
return f'{url}?search={" ".join(shot_names)}'
def get_task(self, task=None, entity=None):
entity = self.get_id(entity)
task_type = self.get_task_type(task)
task = gazu.task.get_task_by_entity(entity, task_type)
if not task:
return
#task = gazu.task.get_task(task['id'])
task['last_comment'] = self.get_last_comment(task)
return task
def set_main_preview(self, preview_data):
gazu.task.set_main_preview(preview_data)
def new_preview(self, task, comment, preview, set_main_preview=False):
#print('new_preview', task, comment, preview, set_main_preview)
task = self.get_id(task)
comment = self.get_id(comment)
preview_data = gazu.task.add_preview(
task=task,
comment=comment,
preview_file_path=preview )
if set_main_preview:
#print('----', 'Settings')
self.set_main_preview(preview_data)
return preview_data
def new_comment(self, task, status=None, comment='', preview=None, set_main_preview=False):
#print('new_comment', task, status, comment, preview, set_main_preview)
#task = self.get_task(task)
#print('Add Comment', status)
if status is None:
#print(task)
status = {'id' : task['task_status_id']}
else:
status = self.get_task_status(status)
comment = gazu.task.add_comment(
task=task,
task_status=status,
comment=comment )
if preview:
#logging.info(f'Adding Preview to Kitsu {preview}')
preview = self.new_preview(
task=task,
comment=comment,
preview=str(preview),
set_main_preview=set_main_preview )
return comment
def remove_comment(self, comment):
return gazu.task.remove_comment(comment)
def download_preview(self, preview_id, filepath):
if isinstance(filepath, str):
filepath = Path(filepath)
if filepath.exists():
return
filepath.parent.mkdir(parents=True, exist_ok=True)
gazu.files.download_preview_file_thumbnail(preview_id, filepath.as_posix())
def new_task(self, entity, task_type, status=None, project=None):
task_type = self.get_task_type(task_type)
if status:
status = self.get_task_status(status)
return gazu.task.new_task(entity, task_type=task_type, task_status=status)
def new_asset(self, name, asset_type_name, project=None, description="", episode=None):
project = self.get_project(project)
project_dict = {'id': self.get_id(project)}
asset_type = gazu.asset.get_asset_type_by_name(asset_type_name)
if not asset_type:
raise ValueError(f"Asset type '{asset_type_name}' not found in Kitsu")
episode_dict = {'id': self.get_id(episode)} if episode else None
return gazu.asset.new_asset(
project_dict, asset_type, name,
description=description, episode=episode_dict
)
def new_episode(self, name, project=None):
project = self.get_project(project)
return gazu.shot.new_episode({'id': self.get_id(project)}, name)
def update_project(self, project=None):
"""Refresh project data — no-op for now, caller should reload via load_projects."""
pass
def new_sequence(self, sequence, episode=None, project=None):
project = self.get_project(project)
params = dict(name=sequence, project=project)
episode = self.get_episode(episode)
if episode:
params['episode'] = episode
return gazu.shot.new_sequence(**params)
def new_shot(self, shot, sequence, nb_frames=None, frame_in=None,
frame_out=None, description='', custom_data=None, with_tasks=True, project=None):
project = self.get_project(project)
sequence = self.get_sequence(sequence)
custom_data = custom_data or {}
if frame_in is not None:
custom_data["frame_in"] = frame_in
if frame_out is not None:
custom_data["frame_out"] = frame_out
params = dict(name=shot, data=custom_data,
sequence_id=self.get_id(sequence), description=description)
if nb_frames is not None:
params["nb_frames"] = nb_frames
shot = self.get_shot(shot=shot, sequence=sequence)
if not shot:
path = f"data/projects/{self.get_id(project)}/shots"
shot = gazu.client.post(path, params)
if with_tasks:
for task_type in self.get_shot_task_types(project=project):
self.new_task(shot, task_type)
return shot
def update_data(self, entity, custom_data={}, name=None, description=None, frames=None, clear=False):
if isinstance(entity, dict):
entity_id = entity['id']
existing_data = entity.get('data') or {}
else:
entity_id = self.get_id(entity)
existing_data = None
payload = {}
if name:
payload['name'] = name
if description:
payload['description'] = description
if frames:
payload['nb_frames'] = frames
if custom_data or clear:
if clear:
payload['data'] = dict(custom_data)
else:
if existing_data is None:
existing_data = gazu.client.fetch_one('entities', entity_id).get('data') or {}
payload['data'] = {**existing_data, **custom_data}
entity_data = gazu.client.put(f"data/entities/{entity_id}", payload)
return entity_data['data']
def get_casting(self, shot, project=None):
project = self.get_project(project)
project_id = self.get_id(project)
return gazu.casting.get_shot_casting({'id': self.get_id(shot), 'project_id': project_id})
def update_casting(self, shot, casting, clear=True, project=None):
project = self.get_project(project)
shot_id = self.get_id(shot)
norm_casting = []
if clear is False:
norm_casting += self.get_casting(shot, project)
for asset in casting:
if isinstance(asset, dict) and 'asset_id' in asset: # It's an asset instance
asset_id = asset['asset_id']
nb_occurences = asset['nb_occurences']
else: # It's an asset
asset = self.get_asset(asset)
nb_occurences = 1
cast = next((c for c in norm_casting if c['asset_id'] == asset_id), None)
if cast:
cast['nb_occurences'] += 1
else:
norm_casting.append({'asset_id': asset_id, 'nb_occurences': nb_occurences})
return gazu.casting.update_shot_casting(project, shot_id, norm_casting)
def draw_prefs(self, layout):
col = layout.column(align=False)
col.prop(self, 'url', text='Url')
col.prop(self, 'login', text='Login')
col.prop(self, 'password', text='Password')
col.separator()
col.prop(self, 'admin_login', text='Admin Login')
col.prop(self, 'admin_password', text='Admin Password')