"""Generic python functions to make operation on file and names""" import fnmatch import json import platform import re import subprocess import unicodedata import os from pathlib import Path import importlib import sys import shutil def install_module(module_name, package_name=None): '''Install a python module with pip or return it if already installed''' try: module = importlib.import_module(module_name) except ModuleNotFoundError: print(f'Installing Module {module_name} ....') subprocess.call([sys.executable, '-m', 'ensurepip']) subprocess.call([sys.executable, '-m', 'pip', 'install', package_name or module_name]) module = importlib.import_module(module_name) return module def import_module_from_path(path): from importlib import util try: path = Path(path) spec = util.spec_from_file_location(path.stem, str(path)) mod = util.module_from_spec(spec) spec.loader.exec_module(mod) return mod except Exception as e: print(f'Cannot import file {path}') print(e) def norm_str(string, separator='_', format=str.lower, padding=0): string = str(string) string = string.replace('_', ' ') string = string.replace('-', ' ') string = re.sub('[ ]+', ' ', string) string = re.sub('[ ]+\/[ ]+', '/', string) string = string.strip() if format: string = format(string) # Padd rightest number string = re.sub(r'(\d+)(?!.*\d)', lambda x : x.group(1).zfill(padding), string) string = string.replace(' ', separator) string = unicodedata.normalize('NFKD', string).encode('ASCII', 'ignore').decode("utf-8") return string def remove_version(filepath): pattern = '_v[0-9]+\.' search = re.search(pattern, filepath) if search: filepath = filepath.replace(search.group()[:-1], '') return Path(filepath).name def is_exclude(name, patterns) -> bool: # from fnmatch import fnmatch if not isinstance(patterns, (list,tuple)) : patterns = [patterns] return any([fnmatch(name, p) for p in patterns]) def get_last_files(root, pattern=r'_v\d{3}\.\w+', only_matching=False, ex_file=None, ex_dir=None, keep=1, verbose=False) -> list: '''Recursively get last(s) file(s) (when there is multiple versions) in passed directory root -> str: Filepath of the folder to scan. pattern -> str: Regex pattern to group files. only_matching -> bool: Discard files that aren't matched by regex pattern. ex_file -> list : List of fn_match pattern to exclude files. ex_dir -> list : List of fn_match pattern of directory name to skip. keep -> int: Number of lasts versions to keep when there are mutliple versionned files (e.g: 1 keep only last). verbose -> bool: Print infos in console. ''' files = [] if ex_file is None: all_items = [f for f in os.scandir(root)] else: all_items = [f for f in os.scandir(root) if not is_exclude(f.name, ex_file)] allfiles = [f for f in all_items if f.is_file()] # Need to sort to effectively group separated key in list allfiles.sort(key=lambda x: x.name) dirs = [f for f in all_items if f.is_dir()] for i in range(len(allfiles)-1,-1,-1):# fastest way to iterate on index in reverse if not re.search(pattern, allfiles[i].name): if only_matching: allfiles.pop(i) else: files.append(allfiles.pop(i).path) # separate remaining files in prefix grouped lists lilist = [list(v) for k, v in groupby(allfiles, key=lambda x: re.split(pattern, x.name)[0])] # get only item last of each sorted grouplist for l in lilist: versions = sorted(l, key=lambda x: x.name)[-keep:] # exclude older for f in versions: files.append(f.path) if verbose and len(l) > 1: print(f'{root}: keep {str([x.name for x in versions])} out of {len(l)} elements') for d in dirs: # recursively treat all detected directory if ex_dir and is_exclude(d.name, ex_dir): # skip folder with excluded name continue files += get_last_files( d.path, pattern=pattern, only_matching=only_matching, ex_file=ex_file, ex_dir=ex_dir, keep=keep) return sorted(files) def copy_file(src, dst, only_new=False, only_recent=False): if dst.exists(): if only_new: return elif only_recent and dst.stat().st_mtime >= src.stat().st_mtime: return dst.parent.mkdir(exist_ok=True, parents=True) print(f'Copy file from {src} to {dst}') if platform.system() == 'Windows': subprocess.call(['copy', str(src), str(dst)], shell=True) else: subprocess.call(['cp', str(src), str(dst)]) def copy_dir(src, dst, only_new=False, only_recent=False, excludes=['.*'], includes=[]): src, dst = Path(src), Path(dst) if includes: includes = r'|'.join([fnmatch.translate(x) for x in includes]) if excludes: excludes = r'|'.join([fnmatch.translate(x) for x in excludes]) if dst.is_dir(): dst.mkdir(exist_ok=True, parents=True) else: dst.parent.mkdir(exist_ok=True, parents=True) if src.is_file(): copy_file(src, dst, only_new=only_new, only_recent=only_recent) elif src.is_dir(): src_files = list(src.rglob('*')) if excludes: src_files = [f for f in src_files if not re.match(excludes, f.name)] if includes: src_files = [f for f in src_files if re.match(includes, f.name)] dst_files = [dst/f.relative_to(src) for f in src_files] for src_file, dst_file in zip(src_files, dst_files) : if src_file.is_dir(): dst_file.mkdir(exist_ok=True, parents=True) else: copy_file(src_file, dst_file, only_new=only_new, only_recent=only_recent) def open_file(filepath, select=False): '''Open a filepath inside the os explorer''' if platform.system() == 'Darwin': # macOS cmd = ['open'] elif platform.system() == 'Windows': # Windows cmd = ['explorer'] if select: cmd += ['/select,'] else: # linux variants cmd = ['xdg-open'] if select: cmd = ['nemo'] cmd += [str(filepath)] subprocess.Popen(cmd) def open_blender_file(filepath=None): filepath = filepath or bpy.data.filepath cmd = sys.argv # if no filepath, use command as is to reopen blender if filepath != '': if len(cmd) > 1 and cmd[1].endswith('.blend'): cmd[1] = str(filepath) else: cmd.insert(1, str(filepath)) subprocess.Popen(cmd) def read_file(path): '''Read a file with an extension in (json, yaml, yml, txt)''' exts = ('.json', '.yaml', '.yml', '.txt') if not path: print('Try to read empty file') path = Path(path) if not path.exists(): print('File not exist', path) return if path.suffix not in exts: print(f'Cannot read file {path}, extension must be in {exts}') return txt = path.read_text() data = None if path.suffix.lower() in ('.yaml', '.yml'): yaml = install_module('yaml') try: data = yaml.safe_load(txt) except Exception: print(f'Could not load yaml file {path}') return elif path.suffix.lower() == '.json': try: data = json.loads(txt) except Exception: print(f'Could not load json file {path}') return else: data = txt return data def write_file(path, data, indent=4): '''Read a file with an extension in (json, yaml, yml, text)''' exts = ('.json', '.yaml', '.yml', '.txt') if not path: print('Try to write empty file') path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) if path.suffix not in exts: print(f'Cannot read file {path}, extension must be in {exts}') return if path.suffix.lower() in ('.yaml', '.yml'): yaml = install_module('yaml') try: path.write_text(yaml.dump(data), encoding='utf8') except Exception as e: print(e) print(f'Could not write yaml file {path}') return elif path.suffix.lower() == '.json': try: path.write_text(json.dumps(data, indent=indent), encoding='utf8') except Exception as e: print(e) print(f'Could not write json file {path}') return else: data = path.write_text(data, encoding='utf8') def synchronize(src, dst, only_new=False, only_recent=False, clear=False): #actionlib_dir = get_actionlib_dir(custom=custom) #local_actionlib_dir = get_actionlib_dir(local=True, custom=custom) try: if clear and Path(dst).exists(): shutil.rmtree(dst) #set_actionlib_dir(custom=custom) script = Path(__file__).parent / 'synchronize.py' cmd = [ sys.executable, script, '--src', str(src), '--dst', str(dst), '--only-new', json.dumps(only_new), '--only-recent', json.dumps(only_recent), ] subprocess.Popen(cmd) except Exception as e: print(e)