vse_toolbox/operators/tracker.py

502 lines
18 KiB
Python

import os
from os.path import expandvars
from pathlib import Path
from pprint import pprint
import webbrowser
from functools import partial
import bpy
from bpy.types import (Operator, )
from bpy.props import (BoolProperty, EnumProperty, StringProperty)
from vse_toolbox.constants import (ASSET_PREVIEWS, PREVIEWS_DIR, ASSET_ITEMS)
from vse_toolbox.sequencer_utils import (get_strips, get_strip_render_path, get_strip_sequence_name,
render_strip, get_render_attributes)
from vse_toolbox.bl_utils import (get_addon_prefs, get_scene_settings)
from vse_toolbox.file_utils import (norm_name, expand)
from bpy.app.handlers import persistent
class VSETB_OT_tracker_connect(Operator):
bl_idname = "vse_toolbox.tracker_connect"
bl_label = "Connect to Tracker"
bl_description = "Connect to Tracker"
@classmethod
def poll(cls, context):
prefs = get_addon_prefs()
if prefs.tracker:
return True
def execute(self, context):
prefs = get_addon_prefs()
settings = get_scene_settings()
try:
prefs.tracker.connect()
self.report({'INFO'}, f'Successfully login to {settings.tracker_name.title()}')
return {"FINISHED"}
except Exception as e:
print('e: ', e)
self.report({'ERROR'}, f'Cannot connect to tracker, check login and password')
return {"CANCELLED"}
class VSETB_OT_load_assets(Operator):
bl_idname = "vse_toolbox.load_assets"
bl_label = "Load Assets for current projects"
bl_description = "Load Assets"
bl_options = {"REGISTER", "UNDO"}
@classmethod
def poll(cls, context):
settings = get_scene_settings()
if settings.active_project:
return True
def execute(self, context):
settings = get_scene_settings()
prefs = get_addon_prefs()
tracker = prefs.tracker
tracker.admin_connect()
project = settings.active_project
project.assets.clear()
assets = tracker.get_assets(project['id'])
if not assets:
self.report({'ERROR'}, f'No Assets found for {project.name}.')
for asset_data in assets:
asset = project.assets.add()
asset.name = asset_data['id']
asset.norm_name = norm_name(asset_data['name'], separator=' ', format=str.lower)
asset.tracker_name = asset_data['name']
asset.id = asset_data['id']
asset.asset_type = asset_data['asset_type']
#for key, value in asset_data.get('data', {}).items():
asset['metadata'] = asset_data.get('data', {})
preview_id = asset_data.get('preview_file_id')
if preview_id:
asset.preview = preview_id
preview_path = Path(PREVIEWS_DIR / project.id / preview_id).with_suffix('.png')
tracker.download_preview(preview_id, preview_path)
if preview_id not in ASSET_PREVIEWS:
ASSET_PREVIEWS.load(preview_id, preview_path.as_posix(), 'IMAGE', True)
set_asset_items()
self.report({'INFO'}, f'Assets for {project.name} successfully loaded')
return {"FINISHED"}
class VSETB_OT_load_projects(Operator):
bl_idname = "vse_toolbox.load_projects"
bl_label = "Load Projects"
bl_description = "Load Projects"
bl_options = {"REGISTER", "UNDO"}
@classmethod
def poll(cls, context):
return True
@staticmethod
def hex_to_rgb(hex_value):
print(hex_value)
b = (hex_value & 0xFF) / 255.0
g = ((hex_value >> 8) & 0xFF) / 255.0
r = ((hex_value >> 16) & 0xFF) / 255.0
return r, g, b
@staticmethod
def hex_to_rgb(color_str):
# supports '123456', '#123456' and '0x123456'
(r,g,b), a = map(lambda component: component / 255, bytes.fromhex(color_str[-6:])), 1.0
return (r,g,b,a)
def invoke(self, context, event):
self.ctrl = event.ctrl
return self.execute(context)
def execute(self, context):
settings = get_scene_settings()
prefs = get_addon_prefs()
tracker = prefs.tracker
prev_project_name = settings.project_name
tracker.admin_connect()
project_datas = tracker.get_projects()
for project_data in sorted(project_datas, key=lambda x: x['name']):
project = settings.projects.get(project_data['name'])
if not project:
project = settings.projects.add()
project.type = project_data['production_type'].upper().replace(' ', '')
project.name = project_data['name']
project.id = project_data['id']
if project.type == 'TVSHOW':
episode_datas = tracker.get_episodes(project_data)
for episode_data in episode_datas:
episode = project.episodes.get(episode_data['name'])
if not episode:
episode = project.episodes.add()
episode.name = episode_data['name']
episode.id = episode_data['id']
# Clear deleted episodes
ep_names = [e['name'] for e in episode_datas]
for ep in reversed(project.episodes):
if ep.name not in ep_names:
project.episodes.remove(list(project.episodes).index(ep))
else:
# Add sequences
sequences_data = tracker.get_sequences(project_data)
for sequence_data in sequences_data:
sequence = project.sequences.get(sequence_data['name'])
if not sequence:
sequence = project.sequences.add()
sequence.name = sequence_data['name']
sequence.id = sequence_data['id']
project.metadata_types.clear()
for metadata_data in tracker.get_metadata_types(project_data):
#pprint(metadata_data)
metadata_type = project.metadata_types.add()
metadata_type.name = metadata_data['name']
metadata_type.field_name = metadata_data['field_name']
metadata_type.data_type = metadata_data['data_type']
#metadata_type['choices'] = metadata_data['choices']
if prefs.sort_metadata_items:
metadata_data['choices'].sort()
for choice in metadata_data['choices']:
choice_item = metadata_type.choices.add()
choice_item.name = choice
metadata_type['entity_type'] = metadata_data['entity_type'].upper()
project.task_statuses.clear()
for status_data in tracker.get_task_statuses(project_data):
#print(metadata_data)
task_status = project.task_statuses.add()
task_status.name = status_data['short_name'].upper()
project.task_types.clear()
for task_type_data in tracker.get_shot_task_types(project_data):
task_type = project.task_types.add()
task_type.name = task_type_data['name']
task_type.id = task_type_data['id']
task_type.color = self.hex_to_rgb(task_type_data['color'])[:3]
project.set_shot_tasks()
project.asset_types.clear()
for asset_type_data in tracker.get_asset_types(project_data):
asset_type = project.asset_types.add()
asset_type.name = asset_type_data['name']
project.set_spreadsheet()
# Remove deleted projects
project_names = [p['name'] for p in project_datas]
for project in reversed(settings.projects):
if project.name not in project_names:
settings.projects.remove(list(settings.projects).index(project))
if self.ctrl or not settings.get('projects_loaded'):
bpy.ops.vse_toolbox.load_settings()
if prev_project_name != '/' and prev_project_name in settings.projects:
settings.project_name = prev_project_name
#if settings.active_project:
# settings.active_project.set_strip_metadata()
settings['projects_loaded'] = True
self.report({"INFO"}, 'Successfully Load Tracker Projects')
return {'FINISHED'}
class VSETB_OT_new_episode(Operator):
bl_idname = "vse_toolbox.new_episode"
bl_label = "New Episode"
bl_description = "Add new Episode to Project"
bl_options = {"REGISTER", "UNDO"}
episode_name : StringProperty(name="Episode Name", default="")
@classmethod
def poll(cls, context):
return True
def invoke(self, context, event):
scn = context.scene
settings = get_scene_settings()
return context.window_manager.invoke_props_dialog(self)
def execute(self, context):
settings = get_scene_settings()
prefs = get_addon_prefs()
tracker = prefs.tracker
episode_name = settings.episode_template.format(index=int(self.episode_name))
episode = tracker.get_episode(episode_name)
if episode:
self.report({'ERROR'}, f'Episode {episode_name} already exists')
return {"CANCELLED"}
tracker.new_episode(episode_name)
tracker.update_project()
self.report({'INFO'}, f'Episode {episode_name} successfully created')
return {'FINISHED'}
class VSETB_OT_upload_to_tracker(Operator):
bl_idname = "vse_toolbox.upload_to_tracker"
bl_label = "Upload to tracker"
bl_description = "Upload selected strip to tracker"
bl_options = {"REGISTER", "UNDO"}
@classmethod
def poll(cls, context):
return True
def invoke(self, context, event):
prefs = get_addon_prefs()
settings = get_scene_settings()
self.project = settings.active_project
tracker = prefs.tracker
tracker.connect()
#self.bl_label = f"Upload to {settings.tracker_name.title()}"
return context.window_manager.invoke_props_dialog(self, width=350)
def draw(self, context):
scn = context.scene
upload_to_tracker = self.project.upload_to_tracker
layout = self.layout
col = layout.column()
col.use_property_split = True
col.use_property_decorate = False
col.prop(upload_to_tracker, 'render_strips', text='Render Strips')
if upload_to_tracker.render_strips:
col.use_property_split = False
col.prop(upload_to_tracker, 'render_strip_template', text='')
col.use_property_split = True
col.separator()
col.prop(upload_to_tracker, 'task', text='Task')
col.prop(upload_to_tracker, 'status', text='Status')
col.prop(upload_to_tracker, 'comment', text='Comment')
row = col.row(heading='Add Preview')
row.prop(upload_to_tracker, 'add_preview', text='')
row.prop(upload_to_tracker, 'preview_mode', text='')
if upload_to_tracker.add_preview and not upload_to_tracker.render_strips:
col.use_property_split = False
col.prop(self.project, 'render_video_strip_template', text='')
col.use_property_split = True
col.separator()
col.prop(upload_to_tracker, 'custom_data', text='Custom Data')
col.prop(upload_to_tracker, 'update_frames', text='Update frames')
col.prop(upload_to_tracker, 'casting', text='Casting')
col.prop(upload_to_tracker, 'tasks_comment', text='Tasks Comment')
col.prop(upload_to_tracker, 'set_main_preview', text='Set Main Preview')
def execute(self, context):
#self.report({'ERROR'}, f'Export not implemented yet.')
prefs = get_addon_prefs()
settings = get_scene_settings()
tracker = prefs.tracker
upload_to_tracker = self.project.upload_to_tracker
render_attrs = get_render_attributes()
project_templates = {t.name: t.value for t in self.project.templates}
episode = None
if settings.active_episode:
episode = settings.active_episode.id
format_data = {**settings.format_data, **self.project.format_data}
status = upload_to_tracker.status
if status == 'CURRENT':
status = None
shot_strips = get_strips(channel='Shots', selected_only=True)
context.window_manager.progress_begin(0, len(shot_strips))
for i, strip in enumerate(shot_strips):
context.window_manager.progress_update(i)
strip_settings = strip.vsetb_strip_settings
sequence_name = get_strip_sequence_name(strip)
shot_name = strip.name
sequence = tracker.get_sequence(sequence_name, episode=episode)
metadata = strip_settings.metadata.to_dict(use_name=False)
#print(metadata)
if not sequence:
self.report({"INFO"}, f'Create sequence {sequence_name} in Kitsu')
sequence = tracker.new_sequence(sequence_name, episode=episode)
shot = tracker.get_shot(shot_name, sequence=sequence)
if not shot:
self.report({"INFO"}, f'Create shot {shot_name} in Kitsu')
shot = tracker.new_shot(shot_name, sequence=sequence)
task = tracker.get_task(upload_to_tracker.task, entity=shot)
if not task:
task = tracker.new_task(shot, task_type=upload_to_tracker.task)
preview = None
if upload_to_tracker.add_preview:
strip_data = {**format_data, **strip_settings.format_data}
if upload_to_tracker.render_strips:
preview_template = expand(upload_to_tracker.render_strip_template, **project_templates)
strip_data['ext'] = 'mov'
else:
preview_template = expand(self.project.render_video_strip_template, **project_templates)
preview = preview_template.format(**strip_data)
preview = Path(os.path.abspath(bpy.path.abspath(preview)))
if upload_to_tracker.render_strips:
render_strip(strip, preview, attributes=render_attrs)
#print(preview)
if not preview.exists():
print(f'The preview {preview} not exists')
preview = None
elif task.get('last_comment') and task['last_comment']['previews']:
if upload_to_tracker.preview_mode == 'REPLACE':
tracker.remove_comment(task['last_comment'])
elif upload_to_tracker.preview_mode == 'ONLY_NEW':
preview = None
comment_data = None
if status or upload_to_tracker.comment or preview:
comment_data = tracker.new_comment(task, comment=upload_to_tracker.comment, status=status)
if preview:
print('Upload preview from', preview)
preview_data = tracker.new_preview(
task=task,
comment=comment_data,
preview=preview)
if upload_to_tracker.set_main_preview:
bpy.app.timers.register(partial(tracker.set_main_preview, preview_data), first_interval=10)
params = {}
if upload_to_tracker.custom_data:
params['custom_data'] = metadata
params['description'] = strip_settings.description
if upload_to_tracker.update_frames:
params['frames'] = strip.frame_final_duration
if params:
tracker.update_data(shot, **params)
if upload_to_tracker.casting:
casting = [{'asset_id': a.id, 'nb_occurences': a.instance} for a in strip_settings.casting]
tracker.update_casting(shot, casting)
if upload_to_tracker.tasks_comment:
for task_type in self.project.task_types:
task = getattr(strip_settings.tasks, norm_name(task_type.name))
tracker_task = tracker.get_task(task_type.name, entity=shot)
if task.comment and tracker_task.get('last_comment') != task.comment:
tracker.new_comment(tracker_task, comment=task.comment)
context.window_manager.progress_end()
return {"FINISHED"}
class VSETB_OT_open_shot_on_tracker(Operator):
bl_idname = "vse_toolbox.open_shot_on_tracker"
bl_label = "Open Shot on Tracker"
bl_description = "Open Shot on Tracker"
@classmethod
def poll(cls, context):
if not get_strips(channel='Shots', selected_only=True):
cls.poll_message_set('No active Shots strip')
return
prefs = get_addon_prefs()
if prefs.tracker:
return True
def execute(self, context):
prefs = get_addon_prefs()
tracker = prefs.tracker
strips = get_strips(channel='Shots', selected_only=True)
url = tracker.get_shots_search_url([s.name for s in strips])
webbrowser.open_new_tab(url)
return {"FINISHED"}
@persistent
def set_asset_items(scene=None):
ASSET_ITEMS.clear()
settings = get_scene_settings()
if settings.active_project:
ASSET_ITEMS.extend([(a.id, a.label, '', i) for i, a in enumerate(settings.active_project.assets)])
classes = (
VSETB_OT_load_assets,
VSETB_OT_load_projects,
VSETB_OT_new_episode,
VSETB_OT_tracker_connect,
VSETB_OT_upload_to_tracker,
VSETB_OT_open_shot_on_tracker
)
def register():
if not bpy.app.background:
bpy.app.handlers.load_post.append(set_asset_items)
for cls in classes:
bpy.utils.register_class(cls)
def unregister():
for cls in reversed(classes):
bpy.utils.unregister_class(cls)
if not bpy.app.background:
bpy.app.handlers.load_post.remove(set_asset_items)