You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
o3de/Tools/LyTestTools/ly_test_tools/environment/file_system.py

449 lines
15 KiB
Python

"""
Copyright (c) Contributors to the Open 3D Engine Project
SPDX-License-Identifier: Apache-2.0 OR MIT
File system related functions.
"""
import errno
import glob
import logging
import os
import psutil
import shutil
import stat
import sys
import tarfile
import time
import zipfile
import ly_test_tools.environment.process_utils as process_utils
logger = logging.getLogger(__name__)
ONE_KIB = 1024
ONE_MIB = 1024 * ONE_KIB
ONE_GIB = 1024 * ONE_MIB
def check_free_space(dest, required_space, msg):
""" Make sure the required space is available on destination, raising an IOError if there is not. """
free_space = psutil.disk_usage(dest).free
if free_space < required_space:
raise IOError(
errno.ENOSPC,
f'{msg} {free_space / ONE_GIB:.2f} GiB '
f'vs {required_space / ONE_GIB:.2f} GiB')
def safe_makedirs(dest_path):
""" This allows an OSError in the case the directory cannot be created, which is logged but does not propagate."""
try:
logger.debug(f'Creating directory "{dest_path}"')
os.makedirs(dest_path)
except OSError as e:
if e.errno == errno.EEXIST:
pass
elif e.errno == errno.EACCES and sys.platform == 'win32' and dest_path.endswith(':\\'):
# In this case, windows will raise EACCES instead of EEXIST if you try to make a directory at the root.
pass
else:
logger.debug(f'Could not create directory: "{dest_path}".')
raise
def get_newest_file_in_dir(path, exts):
""" Find the newest file in a directory, matching the extensions provided. """
dir_iter = []
for ext in exts:
dir_iter.extend(glob.iglob(os.path.join(path, ext)))
try:
return max(dir_iter, key=os.path.getctime)
except ValueError:
# May not be any files in that directory.
return None
def remove_path_and_extension(src):
"""
Given a src, will strip off the path and the extension. Used in unzip and untgz
Example:
C:\\packages\\lumberyard-XXXX.zip would become lumberyard-XXX
"""
src_name = os.path.basename(src)
src_no_extension, _ = os.path.splitext(src_name)
return src_no_extension
def set_up_decompression(full_size, dest, src, force, allow_exists=False):
"""
Used in unzip and untgz, will check whether the dest has enough space and creates the new build path.
:param full_size: Size of zipped package
:param dest: Target unzip location
:param src: Location of the zipped package
:param force: Boolean determining whether to overwrite the build if it already exists
:param allow_exists: Boolean determining whether to log critical if the build already exists
:return: A tuple containing the unzipped build path and a bool determining whether the build already exists.
"""
exists = False
# Check free space leaving at least a GiB free.
check_free_space(dest, full_size + ONE_GIB, 'Not enough space to safely extract: ')
dst_path = os.path.join(dest, remove_path_and_extension(src))
# Cannot easily compare the zip contents to existing dir. Assumes builds of the same name are identical.
if os.path.exists(dst_path) and not force:
exists = True
# Only log critical if the user wants early termination of the command if the build exists
if allow_exists:
level = logging.getLevelName('INFO')
else:
level = logging.getLevelName('CRITICAL')
logger.log(level, f'Found existing {dst_path}. Will not overwrite.')
return dst_path, exists
return dst_path, exists
def unzip(dest, src, force=False, allow_exists=False):
"""
decompress src_path\\name.zip to the dest directory in a subdirectory called name.
Will strip assets names for lumberyard builds.
Example:
dest = D:\\builds
src = C:\\packages\\lumberyard-XXXX.zip
Result:
C:\\packages\\lumberyard-XXXX.zip decompressed to D:\\builds\\lumberyard-XXXX
src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to.
"""
with zipfile.ZipFile(src, 'r') as zip_file:
full_size = sum(info.file_size for info in zip_file.infolist())
dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists)
if exists:
return dst_path
# Unzip and return final path.
start_time = time.time()
zip_file.extractall(dst_path)
secs = time.time() - start_time
if secs == 0:
secs = 0.01
logger.info(
f'Extracted {full_size / ONE_GIB:.2f} GiB '
f'from "{src}" to "{dst_path}" in '
f'{secs / 60:2.2f} minutes, '
f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.')
return dst_path
def untgz(dest, src, exact_tgz_size=False, force=False, allow_exists=False):
"""
decompress src_path\\name.tgz to the dest directory in a subdirectoy called name.
Will strip assets names for lumberyard builds.
Example:
dest = D:\\builds
src = C:\\packages\\lumberyard-XXXX.tgz
Result:
C:\\packages\\lumberyard-XXXX.tgz decompressed to D:\\builds\\lumberyard-XXXX
src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to.
"""
with tarfile.open(src) as tar_file:
# Determine exact size of tar if instructed, otherwise estimate.
if exact_tgz_size:
full_size = 0
for tarinfo in tar_file:
full_size += tarinfo.size
else:
full_size = os.stat(src).st_size * 4.5
dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists)
if exists:
return dst_path
# Extract it and return final path.
start_time = time.time()
tar_file.extractall(dst_path)
secs = time.time() - start_time
if secs == 0:
secs = 0.01
logger.info(
f'Extracted {full_size / ONE_GIB:.2f} MiB '
f'from {src} to {dst_path} '
f'in {secs / 60:2.2f} minutes, '
f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.')
return dst_path
def change_permissions(path_list, perms):
""" Changes the permissions of the files and folders defined in the file list """
try:
for root, dirs, files in os.walk(path_list):
for dir_name in dirs:
os.chmod(os.path.join(root, dir_name), perms)
for file_name in files:
os.chmod(os.path.join(root, file_name), perms)
except OSError as e:
logger.warning(f"Couldn't change permission : Error: {e.filename} - {e.strerror}.")
return False
else:
return True
def unlock_file(file_name):
"""
Given a file name, unlocks the file for write access.
:param file_name: Path to a file
:return: True if unlock succeeded, else False
"""
if not os.access(file_name, os.W_OK):
os.chmod(file_name, stat.S_IWRITE)
logger.warning(f'Clearing write lock for file {file_name}.')
return True
else:
logger.info(f'File {file_name} not write locked. Unlocking file not necessary.')
return False
def lock_file(file_name):
"""
Given a file name, lock write access to the file.
:param file_name: Path to a file
:return: True if lock succeeded, else False
"""
if os.access(file_name, os.W_OK):
os.chmod(file_name, stat.S_IREAD)
logger.warning(f'Write locking file {file_name}')
return True
else:
logger.info(f'File {file_name} already locked. Locking file not necessary.')
return False
def remove_symlink(path):
try:
# Rmdir can delete a symlink without following the symlink to the original content
os.rmdir(path)
except OSError as e:
if e.errno != errno.ENOTEMPTY:
raise
def remove_symlinks(path, remove_root=False):
""" Removes all symlinks at the provided path and its subdirectories. """
for root, dirs, files in os.walk(path):
for name in dirs:
remove_symlink(os.path.join(root, name))
if remove_root:
remove_symlink(path)
def delete(file_list, del_files, del_dirs):
"""
Given a list of directory paths, delete will remove all subdirectories and files based on which flag is set,
del_files or del_dirs.
:param file_list: A string or an array of artifact paths to delete
:param del_files: True if delete should delete files
:param del_dirs: True if delete should delete directories
:return: True if delete was successful
"""
if isinstance(file_list, str):
file_list = [file_list]
for file_to_delete in file_list:
logger.info(f'Deleting "{file_to_delete}"')
try:
if del_dirs and os.path.isdir(file_to_delete):
change_permissions(file_to_delete, 0o777)
# Remove all symlinks before rmtree blows them away
remove_symlinks(file_to_delete)
shutil.rmtree(file_to_delete)
elif del_files and os.path.isfile(file_to_delete):
os.chmod(file_to_delete, 0o777)
os.remove(file_to_delete)
except OSError as e:
logger.warning(f'Could not delete {e.filename} : Error: {e.strerror}.')
return False
return True
def create_backup(source, backup_dir):
"""
Creates a backup of a single source file by creating a copy of it with the same name + '.bak' in backup_dir
e.g.: foo.txt is stored as backup_dir/foo.txt.bak
:param source: Full path to file to backup
:param backup_dir: Path to the directory to store backup.
"""
if not backup_dir or not os.path.isdir(backup_dir):
logger.error(f'Cannot create backup due to invalid backup directory {backup_dir}')
return
if not os.path.exists(source):
logger.warning(f'Source file {source} does not exist, aborting backup creation.')
return
source_filename = os.path.basename(source)
dest = os.path.join(backup_dir, f'{source_filename}.bak')
logger.info(f'Saving backup of {source} in {dest}')
if os.path.exists(dest):
logger.warning(f'Backup file already exists at {dest}, it will be overwritten.')
try:
shutil.copy(source, dest)
except Exception: # intentionally broad
logger.warning('Could not create backup, exception occurred while copying.', exc_info=True)
def restore_backup(original_file, backup_dir):
"""
Restores a backup file to its original location. Works with a single file only.
:param original_file: Full path to file to overwrite.
:param backup_dir: Path to the directory storing the backup.
"""
if not backup_dir or not os.path.isdir(backup_dir):
logger.error(f'Cannot restore backup due to invalid or nonexistent directory {backup_dir}.')
return
source_filename = os.path.basename(original_file)
backup = os.path.join(backup_dir, f'{source_filename}.bak')
if not os.path.exists(backup):
logger.warning(f'Backup file {backup} does not exist, aborting backup restoration.')
return
logger.info(f'Restoring backup of {original_file} from {backup}')
try:
shutil.copy(backup, original_file)
except Exception: # intentionally broad
logger.warning('Could not restore backup, exception occurred while copying.', exc_info=True)
def delete_oldest(path_glob, keep_num, del_files=True, del_dirs=False):
""" Delete oldest builds, keeping a specific number """
logger.info(
f'Deleting dirs: {del_dirs} files: {del_files} "{path_glob}", keeping {keep_num}')
paths = glob.iglob(path_glob)
paths = sorted(paths, key=lambda fi: os.path.getctime(fi), reverse=True)
return delete(paths[keep_num:], del_files, del_dirs)
def make_junction(dst, src):
"""Create a directory junction on Windows or a hardlink on macOS."""
if not os.path.isdir(src):
raise IOError(f"{src} is not a directory")
elif sys.platform == 'win32':
process_utils.check_output(["mklink", "/J", dst, src], shell=True)
elif sys.platform == 'darwin':
process_utils.check_output(["ln", dst, src])
else:
raise IOError(f"Unsupported operating system: {sys.platform}")
def split_path_where_exists(path):
"""
Splits a path into 2 parts: the part that exists and the part that doesn't.
:param path: the path to split
:return: a tuple (exists_part, remainder) where exists_part is the part that exists and remainder is the part that
doesn't. Either part may be None.
"""
current = path
remainder = None
while True:
if os.path.exists(current):
return current, remainder
next_, tail = os.path.split(current)
tail = tail or next_
remainder = tail if remainder is None else os.path.join(tail, remainder)
if next_ == current:
break
current = next_
return None, remainder
def sanitize_file_name(file_name):
"""
Replaces unsupported file name characters with a double underscore
:param file_name: The target file name to sanitize
:return: The sanitized name
"""
return ''.join(
'__' if c in ['\\', '/', ' ', ':', '*', '<', '>', '"', '|', '?'] + [chr(i) for i in range(32)] else c for c
in file_name)
def reduce_file_name_length(file_name, max_length):
"""
Reduces the length of the string file_name to match the length parameter.
:param file_name: string for the file name to reduce in length.
:param max_length: the length to reduce file_name to.
:return: file name string with a maximum length matching max_length.
"""
reduce_amount = len(file_name) - max_length
if len(file_name) > max_length:
file_name = file_name[:-reduce_amount]
return file_name
def find_ancestor_file(target_file_name, start_path=os.getcwd()):
"""
Find a file with the given name in the ancestor directories by walking up the starting path until the file is found.
:param target_file_name: Name of the file to find.
:param start_path: Optional path to start looking for the file.
:return: Path to the file or None if not found.
"""
current_path = os.path.normpath(start_path)
candidate_path = os.path.join(current_path, target_file_name)
# Limit the number of directories to traverse, to avoid infinite loop in path cycles
for _ in range(15):
if not os.path.exists(candidate_path):
parent_path = os.path.dirname(current_path)
if parent_path == current_path:
# Only true when we are at the directory root, can't keep searching
break
candidate_path = os.path.join(parent_path, target_file_name)
current_path = parent_path
else:
# Found the file we wanted
break
if not os.path.exists(candidate_path):
logger.warning(f'The candidate path {candidate_path} does not exist.')
return None
return candidate_path