from asset_library.common.functions import (read_catalog, write_catalog, norm_asset_datas) from asset_library.common.bl_utils import get_addon_prefs, load_datablocks from asset_library.common.file_utils import read_file, write_file from asset_library.common.template import Template from asset_library.constants import (PREVIEW_ASSETS_SCRIPT, MODULE_DIR) from asset_library import (action, collection, file) from bpy.types import PropertyGroup from bpy.props import StringProperty import bpy from itertools import groupby from pathlib import Path import shutil import os import json import uuid import time from functools import partial import subprocess class AssetLibraryAdapter(PropertyGroup): #def __init__(self): name = "Base Adapter" #library = None #bundle_directory : StringProperty() @property def library(self): prefs = self.addon_prefs for lib in prefs.libraries: if lib.adapter == self: return lib if lib.conform.adapter == self: return lib #@property #def library_path(self): # return self.library.library_path @property def is_conform(self): prefs = self.addon_prefs for lib in prefs.libraries: if lib.adapter == self: return False if lib.conform.adapter == self: return True @property def target_directory(self): if self.is_conform: return self.library.conform.directory return self.library.bundle_dir @property def blend_depth(self): if self.is_conform: return self.library.conform.blend_depth return self.library.blend_depth @property def template_image(self): return Template(self.library.conform.template_image) @property def template_video(self): return Template(self.library.conform.template_video) @property def template_description(self): return Template(self.library.conform.template_description) @property def data_type(self): return self.library.data_type @property def data_types(self): return self.library.data_types #@property #def externalize_data(self): # return self.library.externalize_data #@property #def catalog_path(self): # return self.library.catalog_path def get_catalog_path(self, directory=None): directory = directory or self.target_directory return Path(directory, 'blender_assets.cats.txt') @property def cache_file(self): return Path(self.target_directory) / f"blender_assets.{self.library.id}.json" #return get_asset_datas_file(self.library_path) @property def diff_file(self): return Path(bpy.app.tempdir, 'diff.json') @property def preview_blend(self): return MODULE_DIR / self.data_type.lower() / "preview.blend" @property def preview_assets_file(self): return Path(bpy.app.tempdir, "preview_assets_file.json") @property def addon_prefs(self): return get_addon_prefs() @property def module_type(self): lib_type = self.library.data_type if lib_type == 'ACTION': return action elif lib_type == 'FILE': return file elif lib_type == 'COLLECTION': return collection def to_dict(self): return {p: getattr(self, p) for p in self.bl_rna.properties.keys() if p !='rna_type'} def fetch(self): raise Exception('This method need to be define in the adapter') def norm_file_name(self, name): return name.replace(' ', '_') def read_file(self, file): return read_file(file) def write_file(self, file, data): return write_file(file, data) def copy_file(self, source, destination): src = Path(source) dst = Path(destination) if not source.exists(): print(f'Cannot copy file {source}: file not exist') return dst.parent.mkdir(exist_ok=True, parents=True) if src == dst: print(f'Cannot copy file {source}: source and destination are the same') return print(f'Copy file from {source} to {destination}') shutil.copy2(str(source), str(destination)) def load_datablocks(self, src, names=None, type='objects', link=True, expr=None): """Link or append a datablock from a blendfile""" return load_datablocks(src, names=names, type=type, link=link, expr=expr) def get_asset_relative_path(self, name, catalog): '''Get a relative path for the asset''' name = self.norm_file_name(name) return Path(catalog, name, name).with_suffix('.blend') #def _get_file_name(self, name, filepath): # '''Ensure having a unique name per asset if in the same folder by prefixing with the blend_file name''' # file_name = name # if filepath.stem != name: # file_name = f'{file_name}_{name}' # # return file_name def get_active_asset_library(self): asset_handle = bpy.context.asset_file_handle prefs = get_addon_prefs() asset_handle = bpy.context.asset_file_handle lib = None if '.library_id' in asset_handle.asset_data: lib_id = asset_handle.asset_data['.library_id'] lib = next((l for l in prefs.libraries if l.id == lib_id), None) if not lib: print(f"No library found for id {lib_id}") if not lib: lib = self return lib def get_active_asset_path(self): '''Get the full path of the active asset_handle from the asset brower''' prefs = get_addon_prefs() asset_handle = bpy.context.asset_file_handle lib = self.get_active_asset_library() if 'filepath' in asset_handle.asset_data: asset_path = asset_handle.asset_data['filepath'] asset_path = lib.adapter.format_path(asset_path) else: asset_path = bpy.types.AssetHandle.get_full_library_path( asset_handle, bpy.context.asset_library_ref ) return asset_path def get_template_path(self, template, name, asset_path, catalog): if template.startswith('.'): #the template is relative template = Path(asset_path, template).as_posix() params = { 'name': name, 'asset_path': Path(asset_path), 'catalog': catalog, 'catalog_name': catalog.replace('/', '_'), } return self.format_path(template, **params) def get_description_path(self, name, asset_path, catalog) -> Path: """"Get the path of the json or yaml describing all assets data in one file""" return self.get_template_path(self.library.conform.template_description, name, asset_path, catalog) def get_image_path(self, name, asset_path, catalog) -> Path: return self.get_template_path(self.library.conform.template_image, name, asset_path, catalog) def get_video_path(self, name, asset_path, catalog) -> Path: return self.get_template_path(self.library.conform.template_video, name, asset_path, catalog) ''' def get_path(self, type, name, asset_path, template=None) -> Path: if not template: template = getattr(self, f'{type}_template') if isinstance(template, str): template = Template(template) filepath = Path(asset_path) params = { 'bundle_dir': self.library.bundle_directory, 'conform_dir': self.library.conform.directory, 'rel_path': '', 'catalog':'', 'catalog_name':'', 'name': name } return self.format_path(template, params)#(filepath / template.format(name=name, path=Path(asset_path))).resolve() #def get_image_path(self, name, asset_path): # filepath = Path(asset_path) # image_name = self._get_file_name(name, asset_path) # return (filepath / self.template_image.format(name=image_name)).resolve() def get_cache_image_path(self, name, catalog) -> Path: """"Get the the cache path of a image for asset without an externalized image""" name = self.norm_file_name(name) return Path(self.library_path, '.previews', f"{catalog.replace('/', '_')}_{name}").with_suffix(('.png')) def get_cache_image(self, name, catalog): cache_image_path = self.get_cache_image_path(name, catalog) if cache_image_path.exists(): return cache_image_path ''' #def get_video_path(self, name, asset_path): # filepath = Path(asset_path) # video_name = self._get_file_name(name, asset_path) # return (filepath / self.template_video.format(name=video_name)).resolve() ''' def get_image(self, name, asset_path): image_path = self.get_path('image', name, asset_path) if image_path.exists(): return image_path def get_video(self, name, asset_path): video_path = self.get_path('video', name, asset_path) if video_path.exists(): return video_path ''' def read_asset_description_file(self, asset_path) -> dict: """Read the description file of the asset""" description_path = self.get_description_path(asset_path) return self.read_file(description_path) def write_description_file(self, asset_data, asset_path) -> None: description_path = self.get_description_path(asset_path) return write_file(description_path, asset_data) def write_asset(self, asset, asset_path): bpy.data.libraries.write( str(asset_path), {asset}, path_remap="NONE", fake_user=True, compress=True ) def read_catalog(self, directory=None): """Read the catalog file of the library target directory or of the specified directory""" catalog_path = self.get_catalog_path(directory) cat_data = {} for line in catalog_path.read_text(encoding="utf-8").split('\n'): if line.startswith(('VERSION', '#')) or not line: continue cat_id, cat_path, cat_name = line.split(':') cat_data[cat_path] = {'id':cat_id, 'name':cat_name} return cat_data def write_catalog(self, catalog_data, directory=None): """Write the catalog file in the library target directory or of the specified directory""" catalog_path = self.get_catalog_path(directory) lines = ['VERSION 1', ''] # Add missing parents catalog norm_data = {} for cat_path, cat_data in catalog_data.items(): norm_data[cat_path] = cat_data for p in Path(cat_path).parents[:-1]: if p in data or p in norm_data: continue norm_data[p.as_posix()] = {'id': str(uuid.uuid4()), 'name': '-'.join(p.parts)} for cat_path, cat_data in sorted(norm_data.items()): cat_name = cat_data['name'].replace('/', '-') lines.append(f"{cat_data['id']}:{cat_path}:{cat_name}") print(f'Catalog writen at: {catalog_path}') catalog_path.write_text('\n'.join(lines), encoding="utf-8") def read_cache(self): return self.read_file(self.cache_file) def norm_asset_datas(self, asset_file_datas): ''' Return a new flat list of asset data the filepath keys are merge with the assets keys ''' return norm_asset_datas(asset_file_datas) def write_cache(self, asset_datas): path = self.cache_file print(f'cache file writen to {path}') return write_file(path, list(asset_datas)) def prop_rel_path(self, path, prop): '''Get a filepath relative to a property of the adapter''' field_prop = '{%s}/'%prop prop_value = getattr(self, prop) prop_value = Path(os.path.expandvars(prop_value)).resolve() rel_path = Path(path).resolve().relative_to(prop_value).as_posix() return field_prop + rel_path def write_preview(self, preview, filepath): if not preview or not filepath: return filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) img_size = preview.image_size px = [0] * img_size[0] * img_size[1] * 4 preview.image_pixels_float.foreach_get(px) img = bpy.data.images.new(name=filepath.name, width=img_size[0], height=img_size[1], is_data=True, alpha=True) img.pixels.foreach_set(px) img.filepath_raw = str(filepath.with_suffix('.png')) img.file_format = 'PNG' img.save() def draw_header(self, layout): """Draw the header of the Asset Browser Window""" #layout.separator() self.module_type.gui.draw_header(layout) def draw_context_menu(self, layout): """Draw the context menu of the Asset Browser Window""" #layout.separator() self.module_type.gui.draw_context_menu(layout) def group_key(self, asset_data): """Key used to group assets inside one blend""" catalog_parts = asset_data['catalog'].split('/') + [asset_data['name']] return catalog_parts[:self.blend_depth] #def transfert_preview(self, ) ''' def generate_previews(self, assets, callback): def _generate_previews(assets, callback, src_assets=None): if src_assets: src_assets = [] if bpy.app.is_job_running('RENDER_PREVIEW'): print("Waiting for render...") return 0.2 # waiting time while assets: # generate next preview asset = assets.pop() #print(f"Creating preview for world {world.name}...") asset_path = asset.asset_data['filepath'] src_asset = self.load_datablocks(asset_path, names=asset.name, link=False, type=self.data_types) if not src_asset: #print(f'No asset named {asset.name} in {asset_path]}') return src_assets.append(src_asset) # # set image in the preview object's material # obj = bpy.context.active_object # image = world.node_tree.nodes['Environment Texture'].image # obj.material_slots[0].material.node_tree.nodes['Image Texture'].image = image if self.data_type == 'COLLECTION': asset.children.link(src_asset) # start preview render with bpy.context.temp_override(id=asset): bpy.ops.ed.lib_id_generate_preview() return 0.2 for asset in src_asset: asset.user_clear() bpy.ops.outliner.orphans_purge(do_local_ids=True, do_linked_ids=True, do_recursive=True) callback() return None assets = assets.copy() # create preview images bpy.app.timers.register( functools.partial( _generate_previews, assets, callback ) ) ''' def generate_preview(self, asset_description): """Only generate preview when conforming a library""" #print('generate_preview', filepath, asset_names, data_type) scn = bpy.context.scene #Creating the preview for collection, object or material camera = scn.camera vl = bpy.context.view_layer data_type = self.data_type #asset_description['data_type'] asset_path = asset_description['filepath'] asset_data_names = {} for asset_data in asset_description['assets']: name = asset_data['name'] catalog = asset_data['catalog'] image_path = self.get_image_path(name, asset_path, catalog) if image_path.exists(): continue #Store in a dict all asset_data that does not have preview asset_data_names[name] = dict(asset_data, image_path=image_path) if not asset_data_names: # No preview to generate return asset_names = list(asset_data_names.keys()) assets = self.load_datablocks(asset_path, names=asset_names, link=True, type=data_type) for asset in assets: if not asset: continue asset_data = asset_data_names[asset.name] image_path = asset_data['image_path'] if data_type == 'COLLECTION': bpy.ops.object.collection_instance_add(name=asset.name) bpy.ops.view3d.camera_to_view_selected() instance = vl.objects.active #scn.collection.children.link(asset) scn.render.filepath = str(image_path) print(f'Render asset {asset.name} to {image_path}') bpy.ops.render.render(write_still=True) #instance.user_clear() asset.user_clear() bpy.data.objects.remove(instance) #bpy.ops.object.delete(use_global=False) #scn.collection.children.unlink(asset) bpy.ops.outliner.orphans_purge(do_local_ids=True, do_linked_ids=True, do_recursive=True) def generate_previews(self): cache = self.fetch() #cache_diff.sort(key=lambda x :x['filepath']) #blend_groups = groupby(cache_diff, key=lambda x :x['filepath']) #TODO Support all multiple data_type for asset_description in cache: self.generate_preview(asset_description) # filepath = asset_description['filepath'] # asset_datas = asset_description["assets"] # asset_datas.sort(key=lambda x :x.get('type', self.data_type)) # data_type_groups = groupby(asset_datas, key=lambda x :x.get('type', self.data_type)) # for data_type, same_type_asset_datas in data_type_groups: # asset_names = [a['name'] for a in same_type_asset_datas] # self.generate_preview(filepath, asset_names, data_type) def set_asset_preview(self, asset, asset_data): '''Load an externalize image as preview for an asset''' image_path = Path(asset_data['image']) if not image_path.is_absolute(): image_path = Path(asset_data['filepath'], image_path) image_path = self.format_path(image_path.as_posix()) if image_path and image_path.exists(): with bpy.context.temp_override(id=asset): bpy.ops.ed.lib_id_load_custom_preview( filepath=str(image_path) ) if asset.preview: return asset.preview return #Creating the preview for collection, object or material src_asset = self.load_datablocks(asset_data['filepath'], names=asset_data['name'], link=False, type=self.data_types) if not src_asset: print(f'No asset named {asset_data["name"]} in {asset_data["filepath"]}') return if not self.data_type == 'COLLECTION': print(f'Generate preview of type {self.data_type} not supported yet') return # asset.children.link(src_asset) # bpy.ops.ed.lib_id_generate_preview({"id": asset}) # while bpy.app.is_job_running("RENDER_PREVIEW"): # print(bpy.app.is_job_running("RENDER_PREVIEW")) # time.sleep(0.2) # getattr(bpy.data, self.data_types).remove(src_asset) # return asset.preview #src_asset.user_clear() #return src_asset #asset.children.unlink(src_asset) #getattr(bpy.data, self.data_types).remove(src_asset) # time.sleep(1) # #Transfering pixels between previews # w, h = src_asset.preview.image_size # pixels = [0] * (w*h*4) # src_asset.preview.image_pixels_float.foreach_get(pixels) # asset.preview_ensure() # asset.preview.image_size = src_asset.preview.image_size # asset.preview.image_pixels_float.foreach_set(pixels) #print('pixels transfered') #bpy.app.timers.register(partial(getattr(bpy.data, self.data_types).remove, src_asset), first_interval=1) def set_asset_catalog(self, asset, asset_data, catalog_data): """Find the catalog if already exist or create it""" catalog_name = asset_data['catalog'] catalog = catalog_data.get(catalog_name) if not catalog: catalog = {'id': str(uuid.uuid4()), 'name': catalog_name} catalog_data[catalog_name] = catalog asset.asset_data.catalog_id = catalog['id'] def set_asset_metadata(self, asset, asset_data): """Create custom prop to an asset base on provided data""" metadata = asset_data.get('metadata', {}) library_id = self.library.id if 'library_id' in asset_data: library_id = asset_data['library_id'] metadata['.library_id'] = library_id metadata['filepath'] = asset_data['filepath'] for k, v in metadata.items(): asset.asset_data[k] = v def set_asset_tags(self, asset, asset_data): """Create asset tags base on provided data""" tags = asset_data.get('tags', []) if tags: #Clear all tags first for tag in asset.asset_data.tags[:]: asset.asset_data.tags.remove(tag) for tag in tags: if not tag: continue asset.asset_data.tags.new(tag, skip_if_exists=True) def set_asset_description(self, asset, asset_data): """Set asset description base on provided data""" asset.asset_data.description = asset_data.get('description', '') def bundle(self, cache_diff=None): """Group all new assets in one or multiple blends for the asset browser""" if self.data_type not in ('FILE', 'ACTION', 'COLLECTION'): print(f'{self.data_type} is not supported yet') return target_dir = self.target_directory catalog_data = self.read_catalog() #TODO remove unused catalog write_cache = False if not cache_diff: # Get list of all modifications cache, cache_diff = self.diff() # Only write complete cache at the end write_cache = True self.generate_previews() elif isinstance(cache_diff, (Path, str)): cache_diff = json.loads(Path(cache_diff).read_text(encoding='utf-8')) if self.blend_depth == 0: raise Exception('Blender depth must be 1 at min') #groups = [(cache_diff)] else: cache_diff.sort(key=self.group_key) groups = groupby(cache_diff, key=self.group_key) total_assets = len(cache_diff) print(f'total_assets={total_assets}') if total_assets == 0: print('No assets found') return i = 0 #assets_to_preview = [] for sub_path, asset_datas in groups: blend_name = sub_path[-1].replace(' ', '_').lower() blend_path = Path(target_dir, *sub_path, blend_name).with_suffix('.blend') if blend_path.exists(): print(f'Opening existing bundle blend: {blend_path}') bpy.ops.wm.open_mainfile(filepath=str(blend_path)) else: print(f'Create new bundle blend to: {blend_path}') bpy.ops.wm.read_homefile(use_empty=True) for asset_data in asset_datas: if total_assets <= 100 or i % int(total_assets / 10) == 0: print(f'Progress: {int(i / total_assets * 100)+1}') operation = asset_data.get('operation', 'ADD') asset = getattr(bpy.data, self.data_types).get(asset_data['name']) if operation == 'REMOVE': if asset: getattr(bpy.data, self.data_types).remove(asset) else: print(f'ERROR : Remove Asset: {asset_data["name"]} not found in {blend_path}') continue if operation == 'MODIFY' and not asset: print(f'WARNING: Modifiy Asset: {asset_data["name"]} not found in {blend_path} it will be created') elif operation == 'ADD' or not asset: if asset: #raise Exception(f"Asset {asset_data['name']} Already in Blend") print(f"Asset {asset_data['name']} Already in Blend") getattr(bpy.data, self.data_types).remove(asset) print(f"INFO: Add new asset: {asset_data['name']}") asset = getattr(bpy.data, self.data_types).new(name=asset_data['name']) else: print(f'operation {operation} not supported should be in (ADD, REMOVE, MODIFIED)') continue asset.asset_mark() self.set_asset_preview(asset, asset_data) #if not asset_preview: # assets_to_preview.append((asset_data['filepath'], asset_data['name'], asset_data['data_type'])) #if self.externalize_data: # self.write_preview(preview, filepath) self.set_asset_catalog(asset, asset_data, catalog_data) self.set_asset_metadata(asset, asset_data) self.set_asset_tags(asset, asset_data) self.set_asset_description(asset, asset_data) i += 1 #self.write_asset_preview_file() print(f'Saving Blend to {blend_path}') #blend_path.parent.mkdir(exist_ok=True, parents=True) #bpy.ops.wm.save_as_mainfile(filepath=str(blend_path), compress=True) #if write_cache: # self.write_cache(cache) self.write_catalog(catalog_data) bpy.ops.wm.quit_blender() def norm_cache(self, cache): """ Return a new flat list of asset data the filepath keys are merge with the assets keys""" new_cache = [] for asset_description in cache: asset_description = asset_description.copy() if 'assets' in asset_description: assets = asset_description.pop('assets') for asset_data in assets: new_cache.append({**asset_description, **asset_data}) else: new_cache.append(asset_description) return new_cache def diff(self): """Compare the library cache with it current state and return the difference""" cache = self.read_cache() if cache is None: print(f'Fetch The library {self.library.name} for the first time, might be long...') cache = [] new_cache = self.fetch() #print(cache) cache = {f"{a['filepath']}/{a['name']}": a for a in self.norm_cache(cache)} new_cache = {f"{a['filepath']}/{a['name']}" : a for a in self.norm_cache(new_cache)} assets_added = [v for k, v in new_cache.items() if k not in cache] assets_removed = [v for k, v in cache.items() if k not in new_cache] assets_modified = [v for k, v in cache.items() if v not in assets_removed and v!= new_cache[k]] if assets_added: print(f'{len(assets_added)} Assets Added \n{tuple(a["name"] for a in assets_added[:10])}\n') if assets_removed: print(f'{len(assets_removed)} Assets Removed \n{tuple(a["name"] for a in assets_removed[:10])}\n') if assets_modified: print(f'{len(assets_modified)} Assets Modified \n{tuple(a["name"] for a in assets_modified[:10])}\n') assets_added = [dict(a, operation='ADD') for a in assets_added] assets_removed = [dict(a, operation='REMOVE') for a in assets_removed] assets_modified = [dict(a, operation='MODIFY') for a in assets_modified] cache_diff = assets_added + assets_removed + assets_modified if not cache_diff: print('No change in the library') return new_cache, cache_diff def draw_prefs(self, layout): """Draw the options in the addon preference for this adapter""" annotations = self.__class__.__annotations__ for k, v in annotations.items(): layout.prop(self, k, text=bpy.path.display_name(k)) def format_path(self, template, **kargs): params = dict(self.to_dict(), bundle_dir=Path(self.library.bundle_directory), conform_dir=Path(self.library.conform.directory), **kargs ) return Template(template).format(params).resolve()