""" All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or its licensors. For complete copyright and license terms please see the LICENSE at the root of this distribution (the "License"). All use of this software is governed by the License, or, if provided, by the license below or the license accompanying this file. Do not remove or modify any license notices. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. File system related functions. """ import errno import glob import logging import os import psutil import shutil import stat import sys import tarfile import time import zipfile import ly_test_tools.environment.process_utils as process_utils logger = logging.getLogger(__name__) ONE_KIB = 1024 ONE_MIB = 1024 * ONE_KIB ONE_GIB = 1024 * ONE_MIB def check_free_space(dest, required_space, msg): """ Make sure the required space is available on destination, raising an IOError if there is not. """ free_space = psutil.disk_usage(dest).free if free_space < required_space: raise IOError( errno.ENOSPC, f'{msg} {free_space / ONE_GIB:.2f} GiB ' f'vs {required_space / ONE_GIB:.2f} GiB') def safe_makedirs(dest_path): """ This allows an OSError in the case the directory cannot be created, which is logged but does not propagate.""" try: logger.debug(f'Creating directory "{dest_path}"') os.makedirs(dest_path) except OSError as e: if e.errno == errno.EEXIST: pass elif e.errno == errno.EACCES and sys.platform == 'win32' and dest_path.endswith(':\\'): # In this case, windows will raise EACCES instead of EEXIST if you try to make a directory at the root. pass else: logger.debug(f'Could not create directory: "{dest_path}".') raise def get_newest_file_in_dir(path, exts): """ Find the newest file in a directory, matching the extensions provided. """ dir_iter = [] for ext in exts: dir_iter.extend(glob.iglob(os.path.join(path, ext))) try: return max(dir_iter, key=os.path.getctime) except ValueError: # May not be any files in that directory. return None def remove_path_and_extension(src): """ Given a src, will strip off the path and the extension. Used in unzip and untgz Example: C:\\packages\\lumberyard-XXXX.zip would become lumberyard-XXX """ src_name = os.path.basename(src) src_no_extension, _ = os.path.splitext(src_name) return src_no_extension def set_up_decompression(full_size, dest, src, force, allow_exists=False): """ Used in unzip and untgz, will check whether the dest has enough space and creates the new build path. :param full_size: Size of zipped package :param dest: Target unzip location :param src: Location of the zipped package :param force: Boolean determining whether to overwrite the build if it already exists :param allow_exists: Boolean determining whether to log critical if the build already exists :return: A tuple containing the unzipped build path and a bool determining whether the build already exists. """ exists = False # Check free space leaving at least a GiB free. check_free_space(dest, full_size + ONE_GIB, 'Not enough space to safely extract: ') dst_path = os.path.join(dest, remove_path_and_extension(src)) # Cannot easily compare the zip contents to existing dir. Assumes builds of the same name are identical. if os.path.exists(dst_path) and not force: exists = True # Only log critical if the user wants early termination of the command if the build exists if allow_exists: level = logging.getLevelName('INFO') else: level = logging.getLevelName('CRITICAL') logger.log(level, f'Found existing {dst_path}. Will not overwrite.') return dst_path, exists return dst_path, exists def unzip(dest, src, force=False, allow_exists=False): """ decompress src_path\\name.zip to the dest directory in a subdirectory called name. Will strip assets names for lumberyard builds. Example: dest = D:\\builds src = C:\\packages\\lumberyard-XXXX.zip Result: C:\\packages\\lumberyard-XXXX.zip decompressed to D:\\builds\\lumberyard-XXXX src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to. """ with zipfile.ZipFile(src, 'r') as zip_file: full_size = sum(info.file_size for info in zip_file.infolist()) dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists) if exists: return dst_path # Unzip and return final path. start_time = time.time() zip_file.extractall(dst_path) secs = time.time() - start_time if secs == 0: secs = 0.01 logger.info( f'Extracted {full_size / ONE_GIB:.2f} GiB ' f'from "{src}" to "{dst_path}" in ' f'{secs / 60:2.2f} minutes, ' f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.') return dst_path def untgz(dest, src, exact_tgz_size=False, force=False, allow_exists=False): """ decompress src_path\\name.tgz to the dest directory in a subdirectoy called name. Will strip assets names for lumberyard builds. Example: dest = D:\\builds src = C:\\packages\\lumberyard-XXXX.tgz Result: C:\\packages\\lumberyard-XXXX.tgz decompressed to D:\\builds\\lumberyard-XXXX src can be any file, but lumberyard asset builds will have their name shortened to match the build they belong to. """ with tarfile.open(src) as tar_file: # Determine exact size of tar if instructed, otherwise estimate. if exact_tgz_size: full_size = 0 for tarinfo in tar_file: full_size += tarinfo.size else: full_size = os.stat(src).st_size * 4.5 dst_path, exists = set_up_decompression(full_size, dest, src, force, allow_exists) if exists: return dst_path # Extract it and return final path. start_time = time.time() tar_file.extractall(dst_path) secs = time.time() - start_time if secs == 0: secs = 0.01 logger.info( f'Extracted {full_size / ONE_GIB:.2f} MiB ' f'from {src} to {dst_path} ' f'in {secs / 60:2.2f} minutes, ' f'at {(full_size / ONE_MIB) / secs:.2f} MiB/s.') return dst_path def change_permissions(path_list, perms): """ Changes the permissions of the files and folders defined in the file list """ try: for root, dirs, files in os.walk(path_list): for dir_name in dirs: os.chmod(os.path.join(root, dir_name), perms) for file_name in files: os.chmod(os.path.join(root, file_name), perms) except OSError as e: logger.warning(f"Couldn't change permission : Error: {e.filename} - {e.strerror}.") return False else: return True def unlock_file(file_name): """ Given a file name, unlocks the file for write access. :param file_name: Path to a file :return: True if unlock succeeded, else False """ if not os.access(file_name, os.W_OK): os.chmod(file_name, stat.S_IWRITE) logger.warning(f'Clearing write lock for file {file_name}.') return True else: logger.info(f'File {file_name} not write locked. Unlocking file not necessary.') return False def lock_file(file_name): """ Given a file name, lock write access to the file. :param file_name: Path to a file :return: True if lock succeeded, else False """ if os.access(file_name, os.W_OK): os.chmod(file_name, stat.S_IREAD) logger.warning(f'Write locking file {file_name}') return True else: logger.info(f'File {file_name} already locked. Locking file not necessary.') return False def remove_symlink(path): try: # Rmdir can delete a symlink without following the symlink to the original content os.rmdir(path) except OSError as e: if e.errno != errno.ENOTEMPTY: raise def remove_symlinks(path, remove_root=False): """ Removes all symlinks at the provided path and its subdirectories. """ for root, dirs, files in os.walk(path): for name in dirs: remove_symlink(os.path.join(root, name)) if remove_root: remove_symlink(path) def delete(file_list, del_files, del_dirs): """ Given a list of directory paths, delete will remove all subdirectories and files based on which flag is set, del_files or del_dirs. :param file_list: A string or an array of artifact paths to delete :param del_files: True if delete should delete files :param del_dirs: True if delete should delete directories :return: True if delete was successful """ if isinstance(file_list, str): file_list = [file_list] for file_to_delete in file_list: logger.info(f'Deleting "{file_to_delete}"') try: if del_dirs and os.path.isdir(file_to_delete): change_permissions(file_to_delete, 0o777) # Remove all symlinks before rmtree blows them away remove_symlinks(file_to_delete) shutil.rmtree(file_to_delete) elif del_files and os.path.isfile(file_to_delete): os.chmod(file_to_delete, 0o777) os.remove(file_to_delete) except OSError as e: logger.warning(f'Could not delete {e.filename} : Error: {e.strerror}.') return False return True def create_backup(source, backup_dir): """ Creates a backup of a single source file by creating a copy of it with the same name + '.bak' in backup_dir e.g.: foo.txt is stored as backup_dir/foo.txt.bak :param source: Full path to file to backup :param backup_dir: Path to the directory to store backup. """ if not backup_dir or not os.path.isdir(backup_dir): logger.error(f'Cannot create backup due to invalid backup directory {backup_dir}') return if not os.path.exists(source): logger.warning(f'Source file {source} does not exist, aborting backup creation.') return source_filename = os.path.basename(source) dest = os.path.join(backup_dir, f'{source_filename}.bak') logger.info(f'Saving backup of {source} in {dest}') if os.path.exists(dest): logger.warning(f'Backup file already exists at {dest}, it will be overwritten.') try: shutil.copy(source, dest) except Exception: # intentionally broad logger.warning('Could not create backup, exception occurred while copying.', exc_info=True) def restore_backup(original_file, backup_dir): """ Restores a backup file to its original location. Works with a single file only. :param original_file: Full path to file to overwrite. :param backup_dir: Path to the directory storing the backup. """ if not backup_dir or not os.path.isdir(backup_dir): logger.error(f'Cannot restore backup due to invalid or nonexistent directory {backup_dir}.') return source_filename = os.path.basename(original_file) backup = os.path.join(backup_dir, f'{source_filename}.bak') if not os.path.exists(backup): logger.warning(f'Backup file {backup} does not exist, aborting backup restoration.') return logger.info(f'Restoring backup of {original_file} from {backup}') try: shutil.copy(backup, original_file) except Exception: # intentionally broad logger.warning('Could not restore backup, exception occurred while copying.', exc_info=True) def delete_oldest(path_glob, keep_num, del_files=True, del_dirs=False): """ Delete oldest builds, keeping a specific number """ logger.info( f'Deleting dirs: {del_dirs} files: {del_files} "{path_glob}", keeping {keep_num}') paths = glob.iglob(path_glob) paths = sorted(paths, key=lambda fi: os.path.getctime(fi), reverse=True) return delete(paths[keep_num:], del_files, del_dirs) def make_junction(dst, src): """Create a directory junction on Windows or a hardlink on macOS.""" if not os.path.isdir(src): raise IOError(f"{src} is not a directory") elif sys.platform == 'win32': process_utils.check_output(["mklink", "/J", dst, src], shell=True) elif sys.platform == 'darwin': process_utils.check_output(["ln", dst, src]) else: raise IOError(f"Unsupported operating system: {sys.platform}") def split_path_where_exists(path): """ Splits a path into 2 parts: the part that exists and the part that doesn't. :param path: the path to split :return: a tuple (exists_part, remainder) where exists_part is the part that exists and remainder is the part that doesn't. Either part may be None. """ current = path remainder = None while True: if os.path.exists(current): return current, remainder next_, tail = os.path.split(current) tail = tail or next_ remainder = tail if remainder is None else os.path.join(tail, remainder) if next_ == current: break current = next_ return None, remainder def sanitize_file_name(file_name): """ Replaces unsupported file name characters with a double underscore :param file_name: The target file name to sanitize :return: The sanitized name """ return ''.join( '__' if c in ['\\', '/', ' ', ':', '*', '<', '>', '"', '|', '?'] + [chr(i) for i in range(32)] else c for c in file_name) def reduce_file_name_length(file_name, max_length): """ Reduces the length of the string file_name to match the length parameter. :param file_name: string for the file name to reduce in length. :param max_length: the length to reduce file_name to. :return: file name string with a maximum length matching max_length. """ reduce_amount = len(file_name) - max_length if len(file_name) > max_length: file_name = file_name[:-reduce_amount] return file_name def find_ancestor_file(target_file_name, start_path=os.getcwd()): """ Find a file with the given name in the ancestor directories by walking up the starting path until the file is found. :param target_file_name: Name of the file to find. :param start_path: Optional path to start looking for the file. :return: Path to the file or None if not found. """ current_path = os.path.normpath(start_path) candidate_path = os.path.join(current_path, target_file_name) # Limit the number of directories to traverse, to avoid infinite loop in path cycles for _ in range(15): if not os.path.exists(candidate_path): parent_path = os.path.dirname(current_path) if parent_path == current_path: # Only true when we are at the directory root, can't keep searching break candidate_path = os.path.join(parent_path, target_file_name) current_path = parent_path else: # Found the file we wanted break if not os.path.exists(candidate_path): logger.warning(f'The candidate path {candidate_path} does not exist.') return None return candidate_path