318 lines
9.5 KiB
Python
318 lines
9.5 KiB
Python
|
|
"""Generic python functions to make operation on file and names"""
|
|
|
|
import fnmatch
|
|
import json
|
|
import platform
|
|
import re
|
|
import subprocess
|
|
import unicodedata
|
|
import os
|
|
from pathlib import Path
|
|
import importlib
|
|
import sys
|
|
import shutil
|
|
|
|
import contextlib
|
|
|
|
@contextlib.contextmanager
|
|
def cd(path):
|
|
"""Changes working directory and returns to previous on exit."""
|
|
prev_cwd = Path.cwd()
|
|
os.chdir(path)
|
|
try:
|
|
yield
|
|
finally:
|
|
os.chdir(prev_cwd)
|
|
|
|
def install_module(module_name, package_name=None):
|
|
'''Install a python module with pip or return it if already installed'''
|
|
try:
|
|
module = importlib.import_module(module_name)
|
|
except ModuleNotFoundError:
|
|
print(f'Installing Module {module_name} ....')
|
|
|
|
subprocess.call([sys.executable, '-m', 'ensurepip'])
|
|
subprocess.call([sys.executable, '-m', 'pip', 'install', package_name or module_name])
|
|
|
|
module = importlib.import_module(module_name)
|
|
|
|
return module
|
|
|
|
def import_module_from_path(path):
|
|
from importlib import util
|
|
|
|
try:
|
|
path = Path(path)
|
|
spec = util.spec_from_file_location(path.stem, str(path))
|
|
mod = util.module_from_spec(spec)
|
|
|
|
spec.loader.exec_module(mod)
|
|
|
|
return mod
|
|
except Exception as e:
|
|
print(f'Cannot import file {path}')
|
|
print(e)
|
|
|
|
def norm_str(string, separator='_', format=str.lower, padding=0):
|
|
string = str(string)
|
|
string = string.replace('_', ' ')
|
|
string = string.replace('-', ' ')
|
|
string = re.sub('[ ]+', ' ', string)
|
|
string = re.sub('[ ]+\/[ ]+', '/', string)
|
|
string = string.strip()
|
|
|
|
if format:
|
|
string = format(string)
|
|
|
|
# Padd rightest number
|
|
string = re.sub(r'(\d+)(?!.*\d)', lambda x : x.group(1).zfill(padding), string)
|
|
|
|
string = string.replace(' ', separator)
|
|
string = unicodedata.normalize('NFKD', string).encode('ASCII', 'ignore').decode("utf-8")
|
|
|
|
return string
|
|
|
|
def remove_version(filepath):
|
|
pattern = '_v[0-9]+\.'
|
|
search = re.search(pattern, filepath)
|
|
|
|
if search:
|
|
filepath = filepath.replace(search.group()[:-1], '')
|
|
|
|
return Path(filepath).name
|
|
|
|
def is_exclude(name, patterns) -> bool:
|
|
# from fnmatch import fnmatch
|
|
if not isinstance(patterns, (list,tuple)) :
|
|
patterns = [patterns]
|
|
return any([fnmatch(name, p) for p in patterns])
|
|
|
|
def get_last_files(root, pattern=r'_v\d{3}\.\w+', only_matching=False, ex_file=None, ex_dir=None, keep=1, verbose=False) -> list:
|
|
'''Recursively get last(s) file(s) (when there is multiple versions) in passed directory
|
|
root -> str: Filepath of the folder to scan.
|
|
pattern -> str: Regex pattern to group files.
|
|
only_matching -> bool: Discard files that aren't matched by regex pattern.
|
|
ex_file -> list : List of fn_match pattern to exclude files.
|
|
ex_dir -> list : List of fn_match pattern of directory name to skip.
|
|
keep -> int: Number of lasts versions to keep when there are mutliple versionned files (e.g: 1 keep only last).
|
|
verbose -> bool: Print infos in console.
|
|
'''
|
|
|
|
files = []
|
|
if ex_file is None:
|
|
all_items = [f for f in os.scandir(root)]
|
|
else:
|
|
all_items = [f for f in os.scandir(root) if not is_exclude(f.name, ex_file)]
|
|
|
|
allfiles = [f for f in all_items if f.is_file()]
|
|
# Need to sort to effectively group separated key in list
|
|
allfiles.sort(key=lambda x: x.name)
|
|
|
|
dirs = [f for f in all_items if f.is_dir()]
|
|
|
|
for i in range(len(allfiles)-1,-1,-1):# fastest way to iterate on index in reverse
|
|
if not re.search(pattern, allfiles[i].name):
|
|
if only_matching:
|
|
allfiles.pop(i)
|
|
else:
|
|
files.append(allfiles.pop(i).path)
|
|
|
|
# separate remaining files in prefix grouped lists
|
|
lilist = [list(v) for k, v in groupby(allfiles, key=lambda x: re.split(pattern, x.name)[0])]
|
|
|
|
# get only item last of each sorted grouplist
|
|
for l in lilist:
|
|
versions = sorted(l, key=lambda x: x.name)[-keep:] # exclude older
|
|
for f in versions:
|
|
files.append(f.path)
|
|
|
|
if verbose and len(l) > 1:
|
|
print(f'{root}: keep {str([x.name for x in versions])} out of {len(l)} elements')
|
|
|
|
for d in dirs: # recursively treat all detected directory
|
|
if ex_dir and is_exclude(d.name, ex_dir):
|
|
# skip folder with excluded name
|
|
continue
|
|
files += get_last_files(
|
|
d.path, pattern=pattern, only_matching=only_matching, ex_file=ex_file, ex_dir=ex_dir, keep=keep)
|
|
|
|
return sorted(files)
|
|
|
|
def copy_file(src, dst, only_new=False, only_recent=False):
|
|
if dst.exists():
|
|
if only_new:
|
|
return
|
|
elif only_recent and dst.stat().st_mtime >= src.stat().st_mtime:
|
|
return
|
|
|
|
dst.parent.mkdir(exist_ok=True, parents=True)
|
|
print(f'Copy file from {src} to {dst}')
|
|
if platform.system() == 'Windows':
|
|
subprocess.call(['copy', str(src), str(dst)], shell=True)
|
|
else:
|
|
subprocess.call(['cp', str(src), str(dst)])
|
|
|
|
def copy_dir(src, dst, only_new=False, only_recent=False, excludes=['.*'], includes=[]):
|
|
src, dst = Path(src), Path(dst)
|
|
|
|
if includes:
|
|
includes = r'|'.join([fnmatch.translate(x) for x in includes])
|
|
if excludes:
|
|
excludes = r'|'.join([fnmatch.translate(x) for x in excludes])
|
|
|
|
if dst.is_dir():
|
|
dst.mkdir(exist_ok=True, parents=True)
|
|
else:
|
|
dst.parent.mkdir(exist_ok=True, parents=True)
|
|
|
|
if src.is_file():
|
|
copy_file(src, dst, only_new=only_new, only_recent=only_recent)
|
|
|
|
elif src.is_dir():
|
|
src_files = list(src.rglob('*'))
|
|
if excludes:
|
|
src_files = [f for f in src_files if not re.match(excludes, f.name)]
|
|
|
|
if includes:
|
|
src_files = [f for f in src_files if re.match(includes, f.name)]
|
|
|
|
dst_files = [dst/f.relative_to(src) for f in src_files]
|
|
|
|
for src_file, dst_file in zip(src_files, dst_files) :
|
|
if src_file.is_dir():
|
|
dst_file.mkdir(exist_ok=True, parents=True)
|
|
else:
|
|
copy_file(src_file, dst_file, only_new=only_new, only_recent=only_recent)
|
|
|
|
|
|
def open_file(filepath, select=False):
|
|
'''Open a filepath inside the os explorer'''
|
|
|
|
if platform.system() == 'Darwin': # macOS
|
|
cmd = ['open']
|
|
elif platform.system() == 'Windows': # Windows
|
|
cmd = ['explorer']
|
|
if select:
|
|
cmd += ['/select,']
|
|
else: # linux variants
|
|
cmd = ['xdg-open']
|
|
if select:
|
|
cmd = ['nemo']
|
|
|
|
cmd += [str(filepath)]
|
|
subprocess.Popen(cmd)
|
|
|
|
def open_blender_file(filepath=None):
|
|
filepath = filepath or bpy.data.filepath
|
|
|
|
cmd = sys.argv
|
|
|
|
# if no filepath, use command as is to reopen blender
|
|
if filepath != '':
|
|
if len(cmd) > 1 and cmd[1].endswith('.blend'):
|
|
cmd[1] = str(filepath)
|
|
else:
|
|
cmd.insert(1, str(filepath))
|
|
|
|
subprocess.Popen(cmd)
|
|
|
|
def read_file(path):
|
|
'''Read a file with an extension in (json, yaml, yml, txt)'''
|
|
|
|
exts = ('.json', '.yaml', '.yml', '.txt')
|
|
|
|
if not path:
|
|
print('Try to read empty file')
|
|
|
|
path = Path(path)
|
|
if not path.exists():
|
|
print('File not exist', path)
|
|
return
|
|
|
|
if path.suffix not in exts:
|
|
print(f'Cannot read file {path}, extension must be in {exts}')
|
|
return
|
|
|
|
txt = path.read_text()
|
|
data = None
|
|
|
|
if path.suffix.lower() in ('.yaml', '.yml'):
|
|
yaml = install_module('yaml')
|
|
try:
|
|
data = yaml.safe_load(txt)
|
|
except Exception:
|
|
print(f'Could not load yaml file {path}')
|
|
return
|
|
elif path.suffix.lower() == '.json':
|
|
try:
|
|
data = json.loads(txt)
|
|
except Exception:
|
|
print(f'Could not load json file {path}')
|
|
return
|
|
else:
|
|
data = txt
|
|
|
|
return data
|
|
|
|
def write_file(path, data, indent=4):
|
|
'''Read a file with an extension in (json, yaml, yml, text)'''
|
|
|
|
exts = ('.json', '.yaml', '.yml', '.txt')
|
|
|
|
if not path:
|
|
print('Try to write empty file')
|
|
|
|
path = Path(path)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if path.suffix not in exts:
|
|
print(f'Cannot read file {path}, extension must be in {exts}')
|
|
return
|
|
|
|
if path.suffix.lower() in ('.yaml', '.yml'):
|
|
yaml = install_module('yaml')
|
|
try:
|
|
path.write_text(yaml.dump(data), encoding='utf8')
|
|
except Exception as e:
|
|
print(e)
|
|
print(f'Could not write yaml file {path}')
|
|
return
|
|
elif path.suffix.lower() == '.json':
|
|
try:
|
|
path.write_text(json.dumps(data, indent=indent), encoding='utf8')
|
|
except Exception as e:
|
|
print(e)
|
|
print(f'Could not write json file {path}')
|
|
return
|
|
else:
|
|
data = path.write_text(data, encoding='utf8')
|
|
|
|
|
|
def synchronize(src, dst, only_new=False, only_recent=False, clear=False):
|
|
|
|
#actionlib_dir = get_actionlib_dir(custom=custom)
|
|
#local_actionlib_dir = get_actionlib_dir(local=True, custom=custom)
|
|
|
|
try:
|
|
if clear and Path(dst).exists():
|
|
shutil.rmtree(dst)
|
|
|
|
#set_actionlib_dir(custom=custom)
|
|
|
|
script = Path(__file__).parent / 'synchronize.py'
|
|
|
|
cmd = [
|
|
sys.executable,
|
|
script,
|
|
'--src', str(src),
|
|
'--dst', str(dst),
|
|
'--only-new', json.dumps(only_new),
|
|
'--only-recent', json.dumps(only_recent),
|
|
]
|
|
|
|
subprocess.Popen(cmd)
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
|