asset_library/common/file_utils.py

318 lines
9.5 KiB
Python

"""Generic python functions to make operation on file and names"""
import fnmatch
import json
import platform
import re
import subprocess
import unicodedata
import os
from pathlib import Path
import importlib
import sys
import shutil
import contextlib
@contextlib.contextmanager
def cd(path):
"""Changes working directory and returns to previous on exit."""
prev_cwd = Path.cwd()
os.chdir(path)
try:
yield
finally:
os.chdir(prev_cwd)
def install_module(module_name, package_name=None):
'''Install a python module with pip or return it if already installed'''
try:
module = importlib.import_module(module_name)
except ModuleNotFoundError:
print(f'Installing Module {module_name} ....')
subprocess.call([sys.executable, '-m', 'ensurepip'])
subprocess.call([sys.executable, '-m', 'pip', 'install', package_name or module_name])
module = importlib.import_module(module_name)
return module
def import_module_from_path(path):
from importlib import util
try:
path = Path(path)
spec = util.spec_from_file_location(path.stem, str(path))
mod = util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
except Exception as e:
print(f'Cannot import file {path}')
print(e)
def norm_str(string, separator='_', format=str.lower, padding=0):
string = str(string)
string = string.replace('_', ' ')
string = string.replace('-', ' ')
string = re.sub('[ ]+', ' ', string)
string = re.sub('[ ]+\/[ ]+', '/', string)
string = string.strip()
if format:
string = format(string)
# Padd rightest number
string = re.sub(r'(\d+)(?!.*\d)', lambda x : x.group(1).zfill(padding), string)
string = string.replace(' ', separator)
string = unicodedata.normalize('NFKD', string).encode('ASCII', 'ignore').decode("utf-8")
return string
def remove_version(filepath):
pattern = '_v[0-9]+\.'
search = re.search(pattern, filepath)
if search:
filepath = filepath.replace(search.group()[:-1], '')
return Path(filepath).name
def is_exclude(name, patterns) -> bool:
# from fnmatch import fnmatch
if not isinstance(patterns, (list,tuple)) :
patterns = [patterns]
return any([fnmatch(name, p) for p in patterns])
def get_last_files(root, pattern=r'_v\d{3}\.\w+', only_matching=False, ex_file=None, ex_dir=None, keep=1, verbose=False) -> list:
'''Recursively get last(s) file(s) (when there is multiple versions) in passed directory
root -> str: Filepath of the folder to scan.
pattern -> str: Regex pattern to group files.
only_matching -> bool: Discard files that aren't matched by regex pattern.
ex_file -> list : List of fn_match pattern to exclude files.
ex_dir -> list : List of fn_match pattern of directory name to skip.
keep -> int: Number of lasts versions to keep when there are mutliple versionned files (e.g: 1 keep only last).
verbose -> bool: Print infos in console.
'''
files = []
if ex_file is None:
all_items = [f for f in os.scandir(root)]
else:
all_items = [f for f in os.scandir(root) if not is_exclude(f.name, ex_file)]
allfiles = [f for f in all_items if f.is_file()]
# Need to sort to effectively group separated key in list
allfiles.sort(key=lambda x: x.name)
dirs = [f for f in all_items if f.is_dir()]
for i in range(len(allfiles)-1,-1,-1):# fastest way to iterate on index in reverse
if not re.search(pattern, allfiles[i].name):
if only_matching:
allfiles.pop(i)
else:
files.append(allfiles.pop(i).path)
# separate remaining files in prefix grouped lists
lilist = [list(v) for k, v in groupby(allfiles, key=lambda x: re.split(pattern, x.name)[0])]
# get only item last of each sorted grouplist
for l in lilist:
versions = sorted(l, key=lambda x: x.name)[-keep:] # exclude older
for f in versions:
files.append(f.path)
if verbose and len(l) > 1:
print(f'{root}: keep {str([x.name for x in versions])} out of {len(l)} elements')
for d in dirs: # recursively treat all detected directory
if ex_dir and is_exclude(d.name, ex_dir):
# skip folder with excluded name
continue
files += get_last_files(
d.path, pattern=pattern, only_matching=only_matching, ex_file=ex_file, ex_dir=ex_dir, keep=keep)
return sorted(files)
def copy_file(src, dst, only_new=False, only_recent=False):
if dst.exists():
if only_new:
return
elif only_recent and dst.stat().st_mtime >= src.stat().st_mtime:
return
dst.parent.mkdir(exist_ok=True, parents=True)
print(f'Copy file from {src} to {dst}')
if platform.system() == 'Windows':
subprocess.call(['copy', str(src), str(dst)], shell=True)
else:
subprocess.call(['cp', str(src), str(dst)])
def copy_dir(src, dst, only_new=False, only_recent=False, excludes=['.*'], includes=[]):
src, dst = Path(src), Path(dst)
if includes:
includes = r'|'.join([fnmatch.translate(x) for x in includes])
if excludes:
excludes = r'|'.join([fnmatch.translate(x) for x in excludes])
if dst.is_dir():
dst.mkdir(exist_ok=True, parents=True)
else:
dst.parent.mkdir(exist_ok=True, parents=True)
if src.is_file():
copy_file(src, dst, only_new=only_new, only_recent=only_recent)
elif src.is_dir():
src_files = list(src.rglob('*'))
if excludes:
src_files = [f for f in src_files if not re.match(excludes, f.name)]
if includes:
src_files = [f for f in src_files if re.match(includes, f.name)]
dst_files = [dst/f.relative_to(src) for f in src_files]
for src_file, dst_file in zip(src_files, dst_files) :
if src_file.is_dir():
dst_file.mkdir(exist_ok=True, parents=True)
else:
copy_file(src_file, dst_file, only_new=only_new, only_recent=only_recent)
def open_file(filepath, select=False):
'''Open a filepath inside the os explorer'''
if platform.system() == 'Darwin': # macOS
cmd = ['open']
elif platform.system() == 'Windows': # Windows
cmd = ['explorer']
if select:
cmd += ['/select,']
else: # linux variants
cmd = ['xdg-open']
if select:
cmd = ['nemo']
cmd += [str(filepath)]
subprocess.Popen(cmd)
def open_blender_file(filepath=None):
filepath = filepath or bpy.data.filepath
cmd = sys.argv
# if no filepath, use command as is to reopen blender
if filepath != '':
if len(cmd) > 1 and cmd[1].endswith('.blend'):
cmd[1] = str(filepath)
else:
cmd.insert(1, str(filepath))
subprocess.Popen(cmd)
def read_file(path):
'''Read a file with an extension in (json, yaml, yml, txt)'''
exts = ('.json', '.yaml', '.yml', '.txt')
if not path:
print('Try to read empty file')
path = Path(path)
if not path.exists():
print('File not exist', path)
return
if path.suffix not in exts:
print(f'Cannot read file {path}, extension must be in {exts}')
return
txt = path.read_text()
data = None
if path.suffix.lower() in ('.yaml', '.yml'):
yaml = install_module('yaml')
try:
data = yaml.safe_load(txt)
except Exception:
print(f'Could not load yaml file {path}')
return
elif path.suffix.lower() == '.json':
try:
data = json.loads(txt)
except Exception:
print(f'Could not load json file {path}')
return
else:
data = txt
return data
def write_file(path, data, indent=4):
'''Read a file with an extension in (json, yaml, yml, text)'''
exts = ('.json', '.yaml', '.yml', '.txt')
if not path:
print('Try to write empty file')
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
if path.suffix not in exts:
print(f'Cannot read file {path}, extension must be in {exts}')
return
if path.suffix.lower() in ('.yaml', '.yml'):
yaml = install_module('yaml')
try:
path.write_text(yaml.dump(data), encoding='utf8')
except Exception as e:
print(e)
print(f'Could not write yaml file {path}')
return
elif path.suffix.lower() == '.json':
try:
path.write_text(json.dumps(data, indent=indent), encoding='utf8')
except Exception as e:
print(e)
print(f'Could not write json file {path}')
return
else:
data = path.write_text(data, encoding='utf8')
def synchronize(src, dst, only_new=False, only_recent=False, clear=False):
#actionlib_dir = get_actionlib_dir(custom=custom)
#local_actionlib_dir = get_actionlib_dir(local=True, custom=custom)
try:
if clear and Path(dst).exists():
shutil.rmtree(dst)
#set_actionlib_dir(custom=custom)
script = Path(__file__).parent / 'synchronize.py'
cmd = [
sys.executable,
script,
'--src', str(src),
'--dst', str(dst),
'--only-new', json.dumps(only_new),
'--only-recent', json.dumps(only_recent),
]
subprocess.Popen(cmd)
except Exception as e:
print(e)