You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
o3de/Tools/LyTestTools/ly_test_tools/environment/file_system.py

450 lines
15 KiB
Python

"""
Copyright (c) Contributors to the Open 3D Engine Project.
For complete copyright and license terms please see the LICENSE at the root of this distribution.
SPDX-License-Identifier: Apache-2.0 OR MIT
File system related functions.
"""
import errno
import glob
import logging
import os
import psutil
import shutil
import stat
import sys
import tarfile
import time
import zipfile
import ly_test_tools.environment.process_utils as process_utils
logger = logging.getLogger(__name__)
ONE_KIB = 1024
ONE_MIB = 1024 * ONE_KIB
ONE_GIB = 1024 * ONE_MIB
def check_free_space(dest, required_space, msg):
""" Make sure the required space is available on destination, raising an IOError if there is not. """
free_space = psutil.disk_usage(dest).free
if free_space < required_space:
raise IOError(
errno.ENOSPC,
f'{msg} {free_space / ONE_GIB:.2f} GiB '
f'vs {required_space / ONE_GIB:.2f} GiB')
def safe_makedirs(dest_path):
""" This allows an OSError in the case the directory cannot be created, which is logged but does not propagate."""
try:
logger.debug(f'Creating directory "{dest_path}"')
os.makedirs(dest_path)
except OSError as e:
if e.errno == errno.EEXIST:
pass
elif e.errno == errno.EACCES and sys.platform == 'win32' and dest_path.endswith(':\\'):
# In this case, windows will raise EACCES instead of EEXIST if you try to make a directory at the root.
pass
else:
logger.debug(f'Could not create directory: "{dest_path}".')
raise
def get_newest_file_in_dir(path, exts):
""" Find the newest file in a directory, matching the extensions provided. """
dir_iter = []
for ext in exts:
dir_iter.extend(glob.iglob(os.path.join(path, ext)))
try:
return max(dir_iter, key=os.path.getctime)
except ValueError:
# May not be any files in that directory.
return None
def remove_path_and_extension(src):
"""
Given a src, will strip off the path and the extension. Used in unzip and untgz
Example:
C:\\packages\\lumberyard-XXXX.zip would become lumberyard-XXX
"""
src_name = os.path.basename(src)
src_no_extension, _ = os.path.splitext(src_name)
return src_no_extension
def set_up_decompression(full_size, dest, src, force, allow_exists=False):
"""
Used in unzip and untgz, will check whether the dest has enough space and creates the new build path.
:param full_size: Size of zipped package
:param dest: Target unzip location
:param src: Location of the zipped package
:param force: Boolean determining whether to overwrite the build if it already exists
:param allow_exists: Boolean determining whether to log critical if the build already exists
:return: A tuple containing the unzipped build path and a bool determining whether the build already exists.
"""
exists = False
# Check free space leaving at least a GiB free.
check_free_space(dest, full_size + ONE_GIB, 'Not enough space to safely extract: ')
dst_path = os.path.join(dest, remove_path_and_extension(src))
# Cannot easily compare the zip contents to existing dir. Assumes builds of the same name are identical.
if os.path.exists(dst_path) and not force:
exists = True
# Only log critical if the user wants early termination of the command if the build exists
if allow_exists:
level = logging.getLevelName('INFO')
else:
level = logging.getLevelName('CRITICAL')
logger.log(level, f'Found existing {dst_path}. Will not overwrite.')
return dst_path, exists
return dst_path, exists
def unzip(dest, src, force=False, allow_exists=False):
"""
decompress src_path\\name.zip to the dest directory in a subdirectory called name.
Will strip assets names for lumberyard builds.
Example:
dest = D:\\builds
src = C:\\packages\\lumberyard-XXXX.zip
Result:
C:\\packages\\lumberyard-XXXX.zip decompressed to D:\\builds\\lumberyard-XXXX
src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to.
"""
with zipfile.ZipFile(src, 'r') as zip_file:
full_size = sum(info.file_size for info in zip_file.infolist())
dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists)
if exists:
return dst_path
# Unzip and return final path.
start_time = time.time()
zip_file.extractall(dst_path)
secs = time.time() - start_time
if secs == 0:
secs = 0.01
logger.info(
f'Extracted {full_size / ONE_GIB:.2f} GiB '
f'from "{src}" to "{dst_path}" in '
f'{secs / 60:2.2f} minutes, '
f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.')
return dst_path
def untgz(dest, src, exact_tgz_size=False, force=False, allow_exists=False):
"""
decompress src_path\\name.tgz to the dest directory in a subdirectoy called name.
Will strip assets names for lumberyard builds.
Example:
dest = D:\\builds
src = C:\\packages\\lumberyard-XXXX.tgz
Result:
C:\\packages\\lumberyard-XXXX.tgz decompressed to D:\\builds\\lumberyard-XXXX
src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to.
"""
with tarfile.open(src) as tar_file:
# Determine exact size of tar if instructed, otherwise estimate.
if exact_tgz_size:
full_size = 0
for tarinfo in tar_file:
full_size += tarinfo.size
else:
full_size = os.stat(src).st_size * 4.5
dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists)
if exists:
return dst_path
# Extract it and return final path.
start_time = time.time()
tar_file.extractall(dst_path)
secs = time.time() - start_time
if secs == 0:
secs = 0.01
logger.info(
f'Extracted {full_size / ONE_GIB:.2f} MiB '
f'from {src} to {dst_path} '
f'in {secs / 60:2.2f} minutes, '
f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.')
return dst_path
def change_permissions(path_list, perms):
""" Changes the permissions of the files and folders defined in the file list """
try:
for root, dirs, files in os.walk(path_list):
for dir_name in dirs:
os.chmod(os.path.join(root, dir_name), perms)
for file_name in files:
os.chmod(os.path.join(root, file_name), perms)
except OSError as e:
logger.warning(f"Couldn't change permission : Error: {e.filename} - {e.strerror}.")
return False
else:
return True
def unlock_file(file_name):
"""
Given a file name, unlocks the file for write access.
:param file_name: Path to a file
:return: True if unlock succeeded, else False
"""
if not os.access(file_name, os.W_OK):
os.chmod(file_name, stat.S_IWRITE)
logger.warning(f'Clearing write lock for file {file_name}.')
return True
else:
logger.info(f'File {file_name} not write locked. Unlocking file not necessary.')
return False
def lock_file(file_name):
"""
Given a file name, lock write access to the file.
:param file_name: Path to a file
:return: True if lock succeeded, else False
"""
if os.access(file_name, os.W_OK):
os.chmod(file_name, stat.S_IREAD)
logger.warning(f'Write locking file {file_name}')
return True
else:
logger.info(f'File {file_name} already locked. Locking file not necessary.')
return False
def remove_symlink(path):
try:
# Rmdir can delete a symlink without following the symlink to the original content
os.rmdir(path)
except OSError as e:
if e.errno != errno.ENOTEMPTY:
raise
def remove_symlinks(path, remove_root=False):
""" Removes all symlinks at the provided path and its subdirectories. """
for root, dirs, files in os.walk(path):
for name in dirs:
remove_symlink(os.path.join(root, name))
if remove_root:
remove_symlink(path)
def delete(file_list, del_files, del_dirs):
"""
Given a list of directory paths, delete will remove all subdirectories and files based on which flag is set,
del_files or del_dirs.
:param file_list: A string or an array of artifact paths to delete
:param del_files: True if delete should delete files
:param del_dirs: True if delete should delete directories
:return: True if delete was successful
"""
if isinstance(file_list, str):
file_list = [file_list]
for file_to_delete in file_list:
logger.info(f'Deleting "{file_to_delete}"')
try:
if del_dirs and os.path.isdir(file_to_delete):
change_permissions(file_to_delete, 0o777)
# Remove all symlinks before rmtree blows them away
remove_symlinks(file_to_delete)
shutil.rmtree(file_to_delete)
elif del_files and os.path.isfile(file_to_delete):
os.chmod(file_to_delete, 0o777)
os.remove(file_to_delete)
except OSError as e:
logger.warning(f'Could not delete {e.filename} : Error: {e.strerror}.')
return False
return True
def create_backup(source, backup_dir):
"""
Creates a backup of a single source file by creating a copy of it with the same name + '.bak' in backup_dir
e.g.: foo.txt is stored as backup_dir/foo.txt.bak
:param source: Full path to file to backup
:param backup_dir: Path to the directory to store backup.
"""
if not backup_dir or not os.path.isdir(backup_dir):
logger.error(f'Cannot create backup due to invalid backup directory {backup_dir}')
return
if not os.path.exists(source):
logger.warning(f'Source file {source} does not exist, aborting backup creation.')
return
source_filename = os.path.basename(source)
dest = os.path.join(backup_dir, f'{source_filename}.bak')
logger.info(f'Saving backup of {source} in {dest}')
if os.path.exists(dest):
logger.warning(f'Backup file already exists at {dest}, it will be overwritten.')
try:
shutil.copy(source, dest)
except Exception: # intentionally broad
logger.warning('Could not create backup, exception occurred while copying.', exc_info=True)
def restore_backup(original_file, backup_dir):
"""
Restores a backup file to its original location. Works with a single file only.
:param original_file: Full path to file to overwrite.
:param backup_dir: Path to the directory storing the backup.
"""
if not backup_dir or not os.path.isdir(backup_dir):
logger.error(f'Cannot restore backup due to invalid or nonexistent directory {backup_dir}.')
return
source_filename = os.path.basename(original_file)
backup = os.path.join(backup_dir, f'{source_filename}.bak')
if not os.path.exists(backup):
logger.warning(f'Backup file {backup} does not exist, aborting backup restoration.')
return
logger.info(f'Restoring backup of {original_file} from {backup}')
try:
shutil.copy(backup, original_file)
except Exception: # intentionally broad
logger.warning('Could not restore backup, exception occurred while copying.', exc_info=True)
def delete_oldest(path_glob, keep_num, del_files=True, del_dirs=False):
""" Delete oldest builds, keeping a specific number """
logger.info(
f'Deleting dirs: {del_dirs} files: {del_files} "{path_glob}", keeping {keep_num}')
paths = glob.iglob(path_glob)
paths = sorted(paths, key=lambda fi: os.path.getctime(fi), reverse=True)
return delete(paths[keep_num:], del_files, del_dirs)
def make_junction(dst, src):
"""Create a directory junction on Windows or a hardlink on macOS."""
if not os.path.isdir(src):
raise IOError(f"{src} is not a directory")
elif sys.platform == 'win32':
process_utils.check_output(["mklink", "/J", dst, src], shell=True)
elif sys.platform == 'darwin':
process_utils.check_output(["ln", dst, src])
else:
raise IOError(f"Unsupported operating system: {sys.platform}")
def split_path_where_exists(path):
"""
Splits a path into 2 parts: the part that exists and the part that doesn't.
:param path: the path to split
:return: a tuple (exists_part, remainder) where exists_part is the part that exists and remainder is the part that
doesn't. Either part may be None.
"""
current = path
remainder = None
while True:
if os.path.exists(current):
return current, remainder
next_, tail = os.path.split(current)
tail = tail or next_
remainder = tail if remainder is None else os.path.join(tail, remainder)
if next_ == current:
break
current = next_
return None, remainder
def sanitize_file_name(file_name):
"""
Replaces unsupported file name characters with a double underscore
:param file_name: The target file name to sanitize
:return: The sanitized name
"""
return ''.join(
'__' if c in ['\\', '/', ' ', ':', '*', '<', '>', '"', '|', '?'] + [chr(i) for i in range(32)] else c for c
in file_name)
def reduce_file_name_length(file_name, max_length):
"""
Reduces the length of the string file_name to match the length parameter.
:param file_name: string for the file name to reduce in length.
:param max_length: the length to reduce file_name to.
:return: file name string with a maximum length matching max_length.
"""
reduce_amount = len(file_name) - max_length
if len(file_name) > max_length:
file_name = file_name[:-reduce_amount]
return file_name
def find_ancestor_file(target_file_name, start_path=os.getcwd()):
"""
Find a file with the given name in the ancestor directories by walking up the starting path until the file is found.
:param target_file_name: Name of the file to find.
:param start_path: Optional path to start looking for the file.
:return: Path to the file or None if not found.
"""
current_path = os.path.normpath(start_path)
candidate_path = os.path.join(current_path, target_file_name)
# Limit the number of directories to traverse, to avoid infinite loop in path cycles
for _ in range(15):
if not os.path.exists(candidate_path):
parent_path = os.path.dirname(current_path)
if parent_path == current_path:
# Only true when we are at the directory root, can't keep searching
break
candidate_path = os.path.join(parent_path, target_file_name)
current_path = parent_path
else:
# Found the file we wanted
break
if not os.path.exists(candidate_path):
logger.warning(f'The candidate path {candidate_path} does not exist.')
return None
return candidate_path