import os import shutil import json import uuid import time import subprocess from pathlib import Path from itertools import groupby from functools import partial from glob import glob from copy import deepcopy import bpy from bpy_extras import asset_utils from bpy.types import PropertyGroup from bpy.props import StringProperty #from asset_library.common.functions import (norm_asset_datas,) from ..core.bl_utils import get_addon_prefs, load_datablocks from ..core.file_utils import read_file, write_file from ..core.template import Template from ..constants import (MODULE_DIR, RESOURCES_DIR) from ..data_type import (action, collection, file) #from asset_library.common.library_cache import LibraryCacheDiff class LibraryPlugin(PropertyGroup): #def __init__(self): #name = "Base Adapter" #library = None @property def library(self): prefs = self.addon_prefs for lib in prefs.libraries: if lib.plugin == self: return lib @property def bundle_directory(self): return self.library.library_path @property def data_type(self): return self.library.data_type @property def data_types(self): return self.library.data_types # def get_catalog_path(self, directory=None): # directory = directory or self.bundle_directory # return Path(directory, 'blender_assets.cats.txt') # @property # def cache_file(self): # return Path(self.bundle_directory) / f"blender_assets.{self.library.id}.json" # @property # def tmp_cache_file(self): # return Path(bpy.app.tempdir) / f"blender_assets.{self.library.id}.json" # @property # def diff_file(self): # return Path(bpy.app.tempdir, 'diff.json') @property def preview_blend(self): return MODULE_DIR / self.data_type.lower() / "preview.blend" @property def preview_assets_file(self): return Path(bpy.app.tempdir, "preview_assets_file.json") @property def addon_prefs(self): return get_addon_prefs() @property def module_type(self): lib_type = self.library.data_type if lib_type == 'ACTION': return action elif lib_type == 'FILE': return file elif lib_type == 'COLLECTION': return collection @property def format_data(self): """Dict for formating template""" return dict(self.to_dict(), bundle_dir=self.library.bundle_dir, parent=self.library.parent) def to_dict(self): return {p: getattr(self, p) for p in self.bl_rna.properties.keys() if p !='rna_type'} def read_catalog(self): return self.library.read_catalog() def read_cache(self, filepath=None): return self.library.read_cache(filepath=filepath) def fetch(self): raise Exception('This method need to be define in the plugin') def norm_file_name(self, name): return name.replace(' ', '_') def read_file(self, file): return read_file(file) def write_file(self, file, data): return write_file(file, data) def copy_file(self, source, destination): src = Path(source) dst = Path(destination) if not src.exists(): print(f'Cannot copy file {src}: file not exist') return dst.parent.mkdir(exist_ok=True, parents=True) if src == dst: print(f'Cannot copy file {src}: source and destination are the same') return print(f'Copy file from {src} to {dst}') shutil.copy2(str(src), str(dst)) def load_datablocks(self, src, names=None, type='objects', link=True, expr=None, assets_only=False): """Link or append a datablock from a blendfile""" if type.isupper(): type = f'{type.lower()}s' return load_datablocks(src, names=names, type=type, link=link, expr=expr, assets_only=assets_only) def get_asset_data(self, asset): """Extract asset information on a datablock""" return dict( name=asset.name, type=asset.bl_rna.name.upper(), author=asset.asset_data.author, tags=list(asset.asset_data.tags.keys()), metadata=dict(asset.asset_data), description=asset.asset_data.description, ) def get_asset_relative_path(self, name, catalog): '''Get a relative path for the asset''' name = self.norm_file_name(name) return Path(catalog, name, name).with_suffix('.blend') def get_active_asset_library(self): prefs = get_addon_prefs() asset_handle = bpy.context.asset_file_handle if not asset_handle: return self lib = None if '.library_id' in asset_handle.asset_data: lib_id = asset_handle.asset_data['.library_id'] lib = next((l for l in prefs.libraries if l.id == lib_id), None) if not lib: print(f"No library found for id {lib_id}") if not lib: lib = self return lib def get_active_asset_path(self): '''Get the full path of the active asset_handle from the asset brower''' prefs = get_addon_prefs() asset_handle = bpy.context.asset_file_handle lib = self.get_active_asset_library() if 'filepath' in asset_handle.asset_data: asset_path = asset_handle.asset_data['filepath'] asset_path = lib.plugin.format_path(asset_path) else: asset_path = bpy.types.AssetHandle.get_full_library_path( asset_handle, bpy.context.asset_library_ref ) return asset_path def generate_previews(self): raise Exception('Need to be defined in the plugin') def get_image_path(self, name, catalog, filepath): raise Exception('Need to be defined in the plugin') def get_video_path(self, name, catalog, filepath): raise Exception('Need to be defined in the plugin') def new_asset(self, asset, asset_cache): raise Exception('Need to be defined in the plugin') def remove_asset(self, asset, asset_cache): raise Exception('Need to be defined in the plugin') def set_asset_preview(self, asset, asset_cache): raise Exception('Need to be defined in the plugin') def format_asset_data(self, data): """Get a dict for use in template fields""" return { 'asset_name': data['name'], 'asset_path': Path(data['filepath']), 'catalog': data['catalog'], 'catalog_name': data['catalog'].replace('/', '_'), } def format_path(self, template, data={}, **kargs): if not template: return None if data: data = self.format_asset_data(dict(data, **kargs)) else: data = kargs if template.startswith('.'): #the template is relative template = Path(data['asset_path'], template).as_posix() params = dict( **data, **self.format_data, ) return Template(template).format(params).resolve() def find_path(self, template, data, **kargs): path = self.format_path(template, data, **kargs) paths = glob(str(path)) if paths: return Path(paths[0]) # def read_asset_info_file(self, asset_path) -> dict: # """Read the description file of the asset""" # description_path = self.get_description_path(asset_path) # return self.read_file(description_path) # def write_description_file(self, asset_data, asset_path) -> None: # description_path = self.get_description_path(asset_path) # return write_file(description_path, asset_data) def write_asset(self, asset, asset_path): Path(asset_path).parent.mkdir(exist_ok=True, parents=True) bpy.data.libraries.write( str(asset_path), {asset}, path_remap="NONE", fake_user=True, compress=True ) # def read_catalog(self, directory=None): # """Read the catalog file of the library target directory or of the specified directory""" # catalog_path = self.get_catalog_path(directory) # if not catalog_path.exists(): # return {} # cat_data = {} # for line in catalog_path.read_text(encoding="utf-8").split('\n'): # if line.startswith(('VERSION', '#')) or not line: # continue # cat_id, cat_path, cat_name = line.split(':') # cat_data[cat_path] = {'id':cat_id, 'name':cat_name} # return cat_data # def write_catalog(self, catalog_data, directory=None): # """Write the catalog file in the library target directory or of the specified directory""" # catalog_path = self.get_catalog_path(directory) # lines = ['VERSION 1', ''] # # Add missing parents catalog # norm_data = {} # for cat_path, cat_data in catalog_data.items(): # norm_data[cat_path] = cat_data # for p in Path(cat_path).parents[:-1]: # if p in cat_data or p in norm_data: # continue # norm_data[p.as_posix()] = {'id': str(uuid.uuid4()), 'name': '-'.join(p.parts)} # for cat_path, cat_data in sorted(norm_data.items()): # cat_name = cat_data['name'].replace('/', '-') # lines.append(f"{cat_data['id']}:{cat_path}:{cat_name}") # print(f'Catalog writen at: {catalog_path}') # catalog_path.write_text('\n'.join(lines), encoding="utf-8") # def read_cache(self, cache_path=None): # cache_path = cache_path or self.cache_file # print(f'Read cache from {cache_path}') # return self.read_file(cache_path) # def write_cache(self, asset_infos, cache_path=None): # cache_path = cache_path or self.cache_file # print(f'cache file writen to {cache_path}') # return write_file(cache_path, list(asset_infos)) def prop_rel_path(self, path, prop): '''Get a filepath relative to a property of the plugin''' field_prop = '{%s}/'%prop prop_value = getattr(self, prop) prop_value = Path(os.path.expandvars(prop_value)).resolve() rel_path = Path(path).resolve().relative_to(prop_value).as_posix() return field_prop + rel_path def format_from_ext(self, ext): if ext.startswith('.'): ext = ext[1:] file_format = ext.upper() if file_format == 'JPG': file_format = 'JPEG' elif file_format == 'EXR': file_format = 'OPEN_EXR' return file_format def save_image(self, image, filepath, remove=False): filepath = Path(filepath) if isinstance(image, (str, Path)): image = bpy.data.images.load(str(image)) image.update() image.filepath_raw = str(filepath) file_format = self.format_from_ext(filepath.suffix) image.file_format = file_format image.save() if remove: bpy.data.images.remove(image) else: return image def write_preview(self, preview, filepath): if not preview or not filepath: return filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) img_size = preview.image_size px = [0] * img_size[0] * img_size[1] * 4 preview.image_pixels_float.foreach_get(px) img = bpy.data.images.new(name=filepath.name, width=img_size[0], height=img_size[1], is_data=True, alpha=True) img.pixels.foreach_set(px) self.save_image(img, filepath, remove=True) def draw_header(self, layout): """Draw the header of the Asset Browser Window""" #layout.separator() self.module_type.gui.draw_header(layout) def draw_context_menu(self, layout): """Draw the context menu of the Asset Browser Window""" self.module_type.gui.draw_context_menu(layout) def generate_blend_preview(self, asset_info): asset_name = asset_info['name'] catalog = asset_info['catalog'] asset_path = self.format_path(asset_info['filepath']) dst_image_path = self.get_image_path(asset_name, asset_path, catalog) if dst_image_path.exists(): return # Check if a source image exists and if so copying it in the new directory src_image_path = asset_info.get('image') if src_image_path: src_image_path = self.get_template_path(src_image_path, asset_name, asset_path, catalog) if src_image_path and src_image_path.exists(): self.copy_file(src_image_path, dst_image_path) return print(f'Thumbnailing {asset_path} to {dst_image_path}') blender_thumbnailer = Path(bpy.app.binary_path).parent / 'blender-thumbnailer' dst_image_path.parent.mkdir(exist_ok=True, parents=True) subprocess.call([blender_thumbnailer, str(asset_path), str(dst_image_path)]) success = dst_image_path.exists() if not success: empty_preview = RESOURCES_DIR / 'empty_preview.png' self.copy_file(str(empty_preview), str(dst_image_path)) return success ''' def generate_asset_preview(self, asset_info): """Only generate preview when conforming a library""" #print('\ngenerate_preview', asset_info['filepath']) scn = bpy.context.scene #Creating the preview for collection, object or material camera = scn.camera vl = bpy.context.view_layer data_type = self.data_type #asset_info['data_type'] asset_path = self.format_path(asset_info['filepath']) # Check if a source video exists and if so copying it in the new directory if self.library.template_video: for asset_data in asset_info['assets']: dst_asset_path = self.get_asset_bundle_path(asset_data) dst_video_path = self.format_path(self.library.template_video, asset_data, filepath=dst_asset_path) #Template(src_video_path).find(asset_data, asset_path=dst_asset_path, **self.format_data) if dst_video_path.exists(): print(f'The dest video {dst_video_path} already exist') continue src_video_template = asset_data.get('video') if not src_video_template: continue src_video_path = self.find_path(src_video_template, asset_data, filepath=asset_path)#Template(src_video_path).find(asset_data, asset_path=dst_asset_path, **self.format_data) if src_video_path: print(f'Copy video from {src_video_path} to {dst_video_path}') self.copy_file(src_video_path, dst_video_path) # Check if asset as a preview image or need it to be generated asset_data_names = {} if self.library.template_image: for asset_data in asset_info['assets']: name = asset_data['name'] dst_asset_path = self.get_asset_bundle_path(asset_data) dst_image_path = self.format_path(self.library.template_image, asset_data, filepath=dst_asset_path) if dst_image_path.exists(): print(f'The dest image {dst_image_path} already exist') continue # Check if a source image exists and if so copying it in the new directory src_image_template = asset_data.get('image') if src_image_template: src_image_path = self.find_path(src_image_template, asset_data, filepath=asset_path) if src_image_path: if src_image_path.suffix == dst_image_path.suffix: self.copy_file(src_image_path, dst_image_path) else: #img = bpy.data.images.load(str(src_image_path)) self.save_image(src_image_path, dst_image_path, remove=True) return #Store in a dict all asset_data that does not have preview asset_data_names[name] = dict(asset_data, image_path=dst_image_path) if not asset_data_names: # No preview to generate return print('Making Preview for', asset_data_names) asset_names = list(asset_data_names.keys()) assets = self.load_datablocks(asset_path, names=asset_names, link=True, type=data_type) for asset in assets: if not asset: continue asset_data = asset_data_names[asset.name] image_path = asset_data['image_path'] if asset.preview: print(f'Writing asset preview to {image_path}') self.write_preview(asset.preview, image_path) continue if data_type == 'COLLECTION': bpy.ops.object.collection_instance_add(name=asset.name) bpy.ops.view3d.camera_to_view_selected() instance = vl.objects.active #scn.collection.children.link(asset) scn.render.filepath = str(image_path) print(f'Render asset {asset.name} to {image_path}') bpy.ops.render.render(write_still=True) #instance.user_clear() asset.user_clear() bpy.data.objects.remove(instance) bpy.ops.outliner.orphans_purge(do_local_ids=True, do_linked_ids=True, do_recursive=True) ''' # def set_asset_catalog(self, asset, asset_data, catalog_data): # """Find the catalog if already exist or create it""" # catalog_name = asset_data['catalog'] # catalog = catalog_data.get(catalog_name) # catalog_item = self.catalog.add(asset_data['catalog']) # asset.asset_data.catalog_id = catalog_item.id # if not catalog: # catalog = {'id': str(uuid.uuid4()), 'name': catalog_name} # catalog_data[catalog_name] = catalog # asset.asset_data.catalog_id = catalog['id'] def set_asset_metadata(self, asset, asset_cache): """Create custom prop to an asset base on provided data""" for k, v in asset_cache.metadata.items(): asset.asset_data[k] = v def set_asset_tags(self, asset, asset_cache): """Create asset tags base on provided data""" if asset_cache.tags is not None: for tag in list(asset.asset_data.tags): asset.asset_data.tags.remove(tag) for tag in asset_cache.tags: asset.asset_data.tags.new(tag, skip_if_exists=True) def set_asset_info(self, asset, asset_cache): """Set asset description base on provided data""" asset.asset_data.author = asset_cache.author asset.asset_data.description = asset_cache.description def get_asset_bundle_path(self, asset_cache): """Get the bundle path for that asset""" catalog_parts = asset_cache.catalog_item.parts blend_name = asset_cache.norm_name path_parts = catalog_parts[:self.library.blend_depth] return Path(self.bundle_directory, *path_parts, blend_name, blend_name).with_suffix('.blend') def bundle(self, cache_diff=None): """Group all new assets in one or multiple blends for the asset browser""" supported_types = ('FILE', 'ACTION', 'COLLECTION') supported_operations = ('ADD', 'REMOVE', 'MODIFY') if self.data_type not in supported_types: print(f'{self.data_type} is not supported yet supported types are {supported_types}') return catalog = self.read_catalog() cache = None write_cache = False if not cache_diff: # Get list of all modifications cache = self.fetch() cache_diff = cache.diff() # Write the cache in a temporary file for the generate preview script tmp_cache_file = cache.write(tmp=True) bpy.ops.assetlibrary.generate_previews(name=self.library.name, cache=str(tmp_cache_file)) elif isinstance(cache_diff, (Path, str)): cache_diff = LibraryCacheDiff(cache_diff).read()#json.loads(Path(cache_diff).read_text(encoding='utf-8')) total_diffs = len(cache_diff) print(f'Total Diffs={total_diffs}') if total_diffs == 0: print('No assets found') return i = 0 for bundle_path, asset_diffs in cache_diff.group_by(self.get_asset_bundle_path): if bundle_path.exists(): print(f'Opening existing bundle blend: {bundle_path}') bpy.ops.wm.open_mainfile(filepath=str(bundle_path)) else: print(f'Create new bundle blend to: {bundle_path}') bpy.ops.wm.read_homefile(use_empty=True) for asset_diff in asset_diffs: if total_diffs <= 100 or i % int(total_diffs / 10) == 0: print(f'Progress: {int(i / total_diffs * 100)+1}') operation = asset_diff.operation asset_cache = asset_diff.asset_cache asset = getattr(bpy.data, self.data_types).get(asset_cache.name) if operation == 'REMOVE': if asset: getattr(bpy.data, self.data_types).remove(asset) else: print(f'ERROR : Remove Asset: {asset_cache.name} not found in {bundle_path}') continue elif operation == 'MODIFY': if not asset: print(f'WARNING: Modifiy Asset: {asset_cache.name} not found in {bundle_path} it will be created') if operation == 'ADD' or not asset: if asset: #raise Exception(f"Asset {asset_data['name']} Already in Blend") print(f"Asset {asset_cache.name} Already in Blend") getattr(bpy.data, self.data_types).remove(asset) #print(f"INFO: Add new asset: {asset_data['name']}") asset = getattr(bpy.data, self.data_types).new(name=asset_cache.name) asset.asset_mark() self.set_asset_preview(asset, asset_cache) #if not asset_preview: # assets_to_preview.append((asset_data['filepath'], asset_data['name'], asset_data['data_type'])) #if self.externalize_data: # self.write_preview(preview, filepath) #self.set_asset_catalog(asset, asset_data['catalog']) asset.asset_data.catalog_id = catalog.add(asset_cache.catalog).id self.set_asset_metadata(asset, asset_cache) self.set_asset_tags(asset, asset_cache) self.set_asset_info(asset, asset_cache) i += 1 #self.write_asset_preview_file() print(f'Saving Blend to {bundle_path}') bundle_path.parent.mkdir(exist_ok=True, parents=True) bpy.ops.wm.save_as_mainfile(filepath=str(bundle_path), compress=True) if write_cache: cache.write() #self.write_catalog(catalog_data) catalog.write() bpy.ops.wm.quit_blender() # def unflatten_cache(self, cache): # """ Return a new unflattten list of asset data # grouped by filepath""" # new_cache = [] # cache = deepcopy(cache) # cache.sort(key=lambda x : x['filepath']) # groups = groupby(cache, key=lambda x : x['filepath']) # keys = ['filepath', 'modified', 'library_id'] # for _, asset_datas in groups: # asset_datas = list(asset_datas) # #print(asset_datas[0]) # asset_info = {k:asset_datas[0][k] for k in keys} # asset_info['assets'] = [{k:v for k, v in a.items() if k not in keys+['operation']} for a in asset_datas] # new_cache.append(asset_info) # return new_cache # def flatten_cache(self, cache): # """ Return a new flat list of asset data # the filepath keys are merge with the assets keys""" # # If the cache has a wrong format # if not cache or not isinstance(cache[0], dict): # return [] # new_cache = [] # for asset_info in cache: # asset_info = asset_info.copy() # if 'assets' in asset_info: # assets = asset_info.pop('assets') # for asset_data in assets: # new_cache.append({**asset_info, **asset_data}) # else: # new_cache.append(asset_info) # return new_cache # def diff(self, asset_infos=None): # """Compare the library cache with it current state and return the new cache and the difference""" # cache = self.read_cache() # if cache is None: # print(f'Fetch The library {self.library.name} for the first time, might be long...') # cache = [] # asset_infos = asset_infos or self.fetch() # cache = {f"{a['filepath']}/{a['name']}": a for a in self.flatten_cache(cache)} # new_cache = {f"{a['filepath']}/{a['name']}" : a for a in self.flatten_cache(asset_infos)} # assets_added = [v for k, v in new_cache.items() if k not in cache] # assets_removed = [v for k, v in cache.items() if k not in new_cache] # assets_modified = [v for k, v in cache.items() if v not in assets_removed and v!= new_cache[k]] # if assets_added: # print(f'{len(assets_added)} Assets Added \n{tuple(a["name"] for a in assets_added[:10])}\n') # if assets_removed: # print(f'{len(assets_removed)} Assets Removed \n{tuple(a["name"] for a in assets_removed[:10])}\n') # if assets_modified: # print(f'{len(assets_modified)} Assets Modified \n{tuple(a["name"] for a in assets_modified[:10])}\n') # assets_added = [dict(a, operation='ADD') for a in assets_added] # assets_removed = [dict(a, operation='REMOVE') for a in assets_removed] # assets_modified = [dict(a, operation='MODIFY') for a in assets_modified] # cache_diff = assets_added + assets_removed + assets_modified # if not cache_diff: # print('No change in the library') # return list(new_cache.values()), cache_diff def draw_prefs(self, layout): """Draw the options in the addon preference for this plugin""" annotations = self.__class__.__annotations__ for k, v in annotations.items(): layout.prop(self, k, text=bpy.path.display_name(k))