Reduce AP race conditions and fix minor bugs

main
sweeneys 5 years ago
parent 0ae123eae6
commit 9141221843

@ -108,15 +108,7 @@ class Launcher(object):
# Wait for the AssetProcessor to be open. # Wait for the AssetProcessor to be open.
if launch_ap: if launch_ap:
timeout = 10 self.workspace.asset_processor.start(connect_to_ap=True, connection_timeout=10) # verify connection
self.workspace.asset_processor.start()
ly_test_tools.environment.waiter.wait_for(
lambda: ly_test_tools.environment.process_utils.process_exists(
name="AssetProcessor", ignore_extensions=True),
exc=ly_test_tools.launchers.exceptions.SetupError(
f'AssetProcessor never opened after {timeout} seconds'),
timeout=timeout
)
self.workspace.asset_processor.wait_for_idle() self.workspace.asset_processor.wait_for_idle()
log.debug('AssetProcessor started from calling Launcher.setup()') log.debug('AssetProcessor started from calling Launcher.setup()')

@ -20,7 +20,7 @@ import time
import tempfile import tempfile
import shutil import shutil
import stat import stat
from typing import List from typing import List, Tuple
import psutil import psutil
import ly_test_tools import ly_test_tools
@ -45,6 +45,7 @@ ASSET_PROCESSOR_PLATFORM_MAP = {
ASSET_PROCESSOR_SETTINGS_ROOT_KEY = '/Amazon/AssetProcessor/Settings' ASSET_PROCESSOR_SETTINGS_ROOT_KEY = '/Amazon/AssetProcessor/Settings'
class AssetProcessorError(Exception): class AssetProcessorError(Exception):
""" Indicates that the AssetProcessor raised an error """ """ Indicates that the AssetProcessor raised an error """
@ -88,11 +89,16 @@ class AssetProcessor(object):
def wait_for_idle(self, timeout=DEFAULT_TIMEOUT_SECONDS): def wait_for_idle(self, timeout=DEFAULT_TIMEOUT_SECONDS):
""" """
Communicate with asset processor to request a response if either AP is currently idle, Communicate with asset processor to request a response if either AP is currently idle,
or whenever it becomes idle or that it becomes idle within the timeout
""" """
if not self.process_exists():
logger.warning("Not currently managing a running Asset Processor, cannot request idle")
return False
self.send_message("waitforidle") self.send_message("waitforidle")
result = self.read_message(read_timeout=timeout) result = self.read_message(read_timeout=timeout)
assert result == "idle" or not self.process_exists(), f"Couldn't get idle state from AP, message {result}" assert self.process_exists(), "Asset Processor appears unexpectedly shut down, or has crashed"
assert result == "idle", f"Did not get idle state from AP, message was instead: {result}"
return True return True
def next_idle(self): def next_idle(self):
@ -103,9 +109,14 @@ class AssetProcessor(object):
which will be picked up by the scanner but may take a couple seconds to begin, and you only want which will be picked up by the scanner but may take a couple seconds to begin, and you only want
to hear back after it's done. to hear back after it's done.
""" """
if not self.process_exists():
logger.warning("Not currently managing a running Asset Processor, cannot request idle")
return
self.send_message("signalidle") self.send_message("signalidle")
result = self.read_message() result = self.read_message()
assert result == "idle" or not self.process_exists(), f"Couldn't get idle state from AP, message {result}" assert self.process_exists(), "Asset Processor appears unexpectedly shut down, or has crashed"
assert result == "idle", f"Did not get idle state from AP, message was instead: {result}"
def send_quit(self): def send_quit(self):
""" """
@ -160,8 +171,6 @@ class AssetProcessor(object):
logger.warning(f"Failed to read message from with error {e}") logger.warning(f"Failed to read message from with error {e}")
return f"error_{e}" return f"error_{e}"
def read_control_port(self): def read_control_port(self):
return self.read_port_from_log("Control Port") return self.read_port_from_log("Control Port")
@ -185,7 +194,7 @@ class AssetProcessor(object):
if port: if port:
logger.info(f"Read port type {port_type} : {port}") logger.info(f"Read port type {port_type} : {port}")
return port return port
except: except Exception: # intentionally broad
pass pass
time.sleep(1) time.sleep(1)
logger.warning(f"Failed to read port type {port_type}") logger.warning(f"Failed to read port type {port_type}")
@ -203,7 +212,7 @@ class AssetProcessor(object):
control_timeout = 60 control_timeout = 60
try: try:
return self.connect_socket("Control Connection", self.read_control_port, return self.connect_socket("Control Connection", self.read_control_port,
set_port_method=self.set_control_connection, timeout=control_timeout) set_port_method=self.set_control_connection, timeout=control_timeout)
except AssetProcessorError as e: except AssetProcessorError as e:
# We dont want a failure of our test socket connection to fail the entire test automatically. # We dont want a failure of our test socket connection to fail the entire test automatically.
logger.error(f"Failed to connect control socket with error {e}") logger.error(f"Failed to connect control socket with error {e}")
@ -237,7 +246,7 @@ class AssetProcessor(object):
if set_port_method is not None: if set_port_method is not None:
set_port_method(connection_socket) set_port_method(connection_socket)
return True, None return True, None
except Exception as e: # Purposefully broad except Exception: # Purposefully broad
# Short delay to prevent immediate failure due to slower starting applications such as debug builds # Short delay to prevent immediate failure due to slower starting applications such as debug builds
time.sleep(0.01) time.sleep(0.01)
if not connect_port or not self.using_temp_workspace(): if not connect_port or not self.using_temp_workspace():
@ -261,7 +270,7 @@ class AssetProcessor(object):
:return: None :return: None
""" """
if not self._ap_proc: if not self._ap_proc:
logger.info("Attempting to quit AP but none running") logger.warning("Attempting to quit AP but none running")
return return
if not self._control_connection: if not self._control_connection:
@ -319,7 +328,7 @@ class AssetProcessor(object):
pass pass
_, remaining = psutil.wait_procs(process_list, timeout=10, callback=term_success) _, remaining = psutil.wait_procs(process_list, timeout=10, callback=term_success)
for process in remaining: for process in remaining:
logger.info(f"Killing: {this_process.name()} pid: {this_process.pid}") logger.info(f"Killing: {process.name()} pid: {process.pid}")
process.kill() process.kill()
logger.info("Finished terminating asset processor") logger.info("Finished terminating asset processor")
self._ap_proc = None self._ap_proc = None
@ -500,22 +509,17 @@ class AssetProcessor(object):
def build_ap_command(self, ap_path, fastscan=True, platforms=None, def build_ap_command(self, ap_path, fastscan=True, platforms=None,
extra_params=None, add_gem_scan_folders=None, add_config_scan_folders=None, extra_params=None, add_gem_scan_folders=None, add_config_scan_folders=None,
scan_folder_pattern=None): scan_folder_pattern=None):
# type: (float, bool) -> bool
""" """
Launch asset processor batch and wait for it to complete or until the timeout expires. Launch asset processor batch and wait for it to complete or until the timeout expires.
Returns true on success and False if an error is reported. Returns true on success and False if an error is reported.
:param fastscan: Enable "zero analysis mode" :param fastscan: Enable "zero analysis mode"
:param capture_output = Capture output which will be returned in the second of the return pair
:param platforms: Different set of platforms to run against :param platforms: Different set of platforms to run against
:param add_gem_scan_folders: Should gem scan folders be added to the processing - by default this is off :param add_gem_scan_folders: Should gem scan folders be added to the processing - by default this is off
if scan folder overrides are set, on if not if scan folder overrides are set, on if not
:param add_config_scan_folders: Should config scan folders be added to the processing - by default this is off :param add_config_scan_folders: Should config scan folders be added to the processing - by default this is off
if scan folder overrides are set, on if not if scan folder overrides are set, on if not
:param decode: decode byte strings from captured output to utf-8 :return: Command list ready to pass to subprocess
:return: Pair: Success, output: True if all assets were processed successfully, False if the process times out
or returns errors. output is raw output from process
""" """
logger.info(f"Starting {ap_path}") logger.info(f"Starting {ap_path}")
command = [ap_path] command = [ap_path]
@ -784,7 +788,7 @@ class AssetProcessor(object):
def prepare_test_environment(self, assets_path: str, function_name: str, use_current_root=False, def prepare_test_environment(self, assets_path: str, function_name: str, use_current_root=False,
relative_asset_root=None, add_scan_folder=True, cache_platform=None, relative_asset_root=None, add_scan_folder=True, cache_platform=None,
existing_function_name=None) -> str: existing_function_name=None) -> Tuple[str, str]:
""" """
Creates a temporary test workspace, copies the specified test assets and sets the folder as a scan Creates a temporary test workspace, copies the specified test assets and sets the folder as a scan
folder for processing. folder for processing.
@ -844,7 +848,7 @@ class AssetProcessor(object):
""" """
return self._test_assets_source_folder return self._test_assets_source_folder
def compare_assets_with_cache(self) -> str: def compare_assets_with_cache(self) -> Tuple[List[str], List[str]]:
""" """
Helper to compare output assets from a test run using prepare_test_environment Helper to compare output assets from a test run using prepare_test_environment
@ -982,14 +986,10 @@ def _build_ap_batch_call_params(ap_path, project, platforms, extra_params=None):
project_str.rstrip(' ') project_str.rstrip(' ')
param_list = [ap_path, project_str] param_list = [ap_path, project_str]
if self._disable_all_platforms:
command.append(f'--regremove="f{ASSET_PROCESSOR_SETTINGS_ROOT_KEY}/Platforms"')
if platforms: if platforms:
if isinstance(platforms, list): if isinstance(platforms, list):
platforms = ','.join(platforms) platforms = ','.join(platforms)
param_list.append(f'--platforms={platforms}') param_list.append(f'--platforms={platforms}')
for key, value in self._enabled_platform_overrides:
param_list.append(f'--regset="f{ASSET_PROCESSOR_SETTINGS_ROOT_KEY}/Platforms/{key}={value}"')
if extra_params: if extra_params:
if isinstance(extra_params, list): if isinstance(extra_params, list):
@ -1053,4 +1053,3 @@ def get_num_failed_processed_assets(output):
def has_invalid_server_address(output): def has_invalid_server_address(output):
return parse_output_value(output, 'Invalid server address') is not None return parse_output_value(output, 'Invalid server address') is not None

Loading…
Cancel
Save