From 9141221843463fdedc28d3940c408c3412b6af3e Mon Sep 17 00:00:00 2001 From: sweeneys Date: Thu, 13 May 2021 14:43:30 -0700 Subject: [PATCH] Reduce AP race conditions and fix minor bugs --- .../ly_test_tools/launchers/platforms/base.py | 10 +--- .../ly_test_tools/o3de/asset_processor.py | 47 +++++++++---------- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/Tools/LyTestTools/ly_test_tools/launchers/platforms/base.py b/Tools/LyTestTools/ly_test_tools/launchers/platforms/base.py index 36746b510f..1b0afb3efe 100755 --- a/Tools/LyTestTools/ly_test_tools/launchers/platforms/base.py +++ b/Tools/LyTestTools/ly_test_tools/launchers/platforms/base.py @@ -108,15 +108,7 @@ class Launcher(object): # Wait for the AssetProcessor to be open. if launch_ap: - timeout = 10 - self.workspace.asset_processor.start() - ly_test_tools.environment.waiter.wait_for( - lambda: ly_test_tools.environment.process_utils.process_exists( - name="AssetProcessor", ignore_extensions=True), - exc=ly_test_tools.launchers.exceptions.SetupError( - f'AssetProcessor never opened after {timeout} seconds'), - timeout=timeout - ) + self.workspace.asset_processor.start(connect_to_ap=True, connection_timeout=10) # verify connection self.workspace.asset_processor.wait_for_idle() log.debug('AssetProcessor started from calling Launcher.setup()') diff --git a/Tools/LyTestTools/ly_test_tools/o3de/asset_processor.py b/Tools/LyTestTools/ly_test_tools/o3de/asset_processor.py index 11cd1fc4ce..0983d2c45f 100644 --- a/Tools/LyTestTools/ly_test_tools/o3de/asset_processor.py +++ b/Tools/LyTestTools/ly_test_tools/o3de/asset_processor.py @@ -20,7 +20,7 @@ import time import tempfile import shutil import stat -from typing import List +from typing import List, Tuple import psutil import ly_test_tools @@ -45,6 +45,7 @@ ASSET_PROCESSOR_PLATFORM_MAP = { ASSET_PROCESSOR_SETTINGS_ROOT_KEY = '/Amazon/AssetProcessor/Settings' + class AssetProcessorError(Exception): """ Indicates that the AssetProcessor raised an error """ @@ -88,11 +89,16 @@ class AssetProcessor(object): def wait_for_idle(self, timeout=DEFAULT_TIMEOUT_SECONDS): """ Communicate with asset processor to request a response if either AP is currently idle, - or whenever it becomes idle + or that it becomes idle within the timeout """ + if not self.process_exists(): + logger.warning("Not currently managing a running Asset Processor, cannot request idle") + return False + self.send_message("waitforidle") result = self.read_message(read_timeout=timeout) - assert result == "idle" or not self.process_exists(), f"Couldn't get idle state from AP, message {result}" + assert self.process_exists(), "Asset Processor appears unexpectedly shut down, or has crashed" + assert result == "idle", f"Did not get idle state from AP, message was instead: {result}" return True def next_idle(self): @@ -103,9 +109,14 @@ class AssetProcessor(object): which will be picked up by the scanner but may take a couple seconds to begin, and you only want to hear back after it's done. """ + if not self.process_exists(): + logger.warning("Not currently managing a running Asset Processor, cannot request idle") + return + self.send_message("signalidle") result = self.read_message() - assert result == "idle" or not self.process_exists(), f"Couldn't get idle state from AP, message {result}" + assert self.process_exists(), "Asset Processor appears unexpectedly shut down, or has crashed" + assert result == "idle", f"Did not get idle state from AP, message was instead: {result}" def send_quit(self): """ @@ -160,8 +171,6 @@ class AssetProcessor(object): logger.warning(f"Failed to read message from with error {e}") return f"error_{e}" - - def read_control_port(self): return self.read_port_from_log("Control Port") @@ -185,7 +194,7 @@ class AssetProcessor(object): if port: logger.info(f"Read port type {port_type} : {port}") return port - except: + except Exception: # intentionally broad pass time.sleep(1) logger.warning(f"Failed to read port type {port_type}") @@ -203,7 +212,7 @@ class AssetProcessor(object): control_timeout = 60 try: return self.connect_socket("Control Connection", self.read_control_port, - set_port_method=self.set_control_connection, timeout=control_timeout) + set_port_method=self.set_control_connection, timeout=control_timeout) except AssetProcessorError as e: # We dont want a failure of our test socket connection to fail the entire test automatically. logger.error(f"Failed to connect control socket with error {e}") @@ -237,7 +246,7 @@ class AssetProcessor(object): if set_port_method is not None: set_port_method(connection_socket) return True, None - except Exception as e: # Purposefully broad + except Exception: # Purposefully broad # Short delay to prevent immediate failure due to slower starting applications such as debug builds time.sleep(0.01) if not connect_port or not self.using_temp_workspace(): @@ -261,7 +270,7 @@ class AssetProcessor(object): :return: None """ if not self._ap_proc: - logger.info("Attempting to quit AP but none running") + logger.warning("Attempting to quit AP but none running") return if not self._control_connection: @@ -319,7 +328,7 @@ class AssetProcessor(object): pass _, remaining = psutil.wait_procs(process_list, timeout=10, callback=term_success) for process in remaining: - logger.info(f"Killing: {this_process.name()} pid: {this_process.pid}") + logger.info(f"Killing: {process.name()} pid: {process.pid}") process.kill() logger.info("Finished terminating asset processor") self._ap_proc = None @@ -500,22 +509,17 @@ class AssetProcessor(object): def build_ap_command(self, ap_path, fastscan=True, platforms=None, extra_params=None, add_gem_scan_folders=None, add_config_scan_folders=None, scan_folder_pattern=None): - # type: (float, bool) -> bool """ Launch asset processor batch and wait for it to complete or until the timeout expires. Returns true on success and False if an error is reported. :param fastscan: Enable "zero analysis mode" - :param capture_output = Capture output which will be returned in the second of the return pair :param platforms: Different set of platforms to run against :param add_gem_scan_folders: Should gem scan folders be added to the processing - by default this is off if scan folder overrides are set, on if not :param add_config_scan_folders: Should config scan folders be added to the processing - by default this is off if scan folder overrides are set, on if not - :param decode: decode byte strings from captured output to utf-8 - :return: Pair: Success, output: True if all assets were processed successfully, False if the process times out - or returns errors. output is raw output from process - + :return: Command list ready to pass to subprocess """ logger.info(f"Starting {ap_path}") command = [ap_path] @@ -784,7 +788,7 @@ class AssetProcessor(object): def prepare_test_environment(self, assets_path: str, function_name: str, use_current_root=False, relative_asset_root=None, add_scan_folder=True, cache_platform=None, - existing_function_name=None) -> str: + existing_function_name=None) -> Tuple[str, str]: """ Creates a temporary test workspace, copies the specified test assets and sets the folder as a scan folder for processing. @@ -844,7 +848,7 @@ class AssetProcessor(object): """ return self._test_assets_source_folder - def compare_assets_with_cache(self) -> str: + def compare_assets_with_cache(self) -> Tuple[List[str], List[str]]: """ Helper to compare output assets from a test run using prepare_test_environment @@ -982,14 +986,10 @@ def _build_ap_batch_call_params(ap_path, project, platforms, extra_params=None): project_str.rstrip(' ') param_list = [ap_path, project_str] - if self._disable_all_platforms: - command.append(f'--regremove="f{ASSET_PROCESSOR_SETTINGS_ROOT_KEY}/Platforms"') if platforms: if isinstance(platforms, list): platforms = ','.join(platforms) param_list.append(f'--platforms={platforms}') - for key, value in self._enabled_platform_overrides: - param_list.append(f'--regset="f{ASSET_PROCESSOR_SETTINGS_ROOT_KEY}/Platforms/{key}={value}"') if extra_params: if isinstance(extra_params, list): @@ -1053,4 +1053,3 @@ def get_num_failed_processed_assets(output): def has_invalid_server_address(output): return parse_output_value(output, 'Invalid server address') is not None -