Merge pull request #4589 from aws-lumberyard-dev/lytt_integ_linux

Platform manager and sanity tests for Linux
monroegm-disable-blank-issue-2
Sean Sweeney 4 years ago committed by GitHub
commit 1ea472bdeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,7 +16,7 @@ ALL_PLATFORM_OPTIONS = ['android', 'ios', 'linux', 'mac', 'windows']
ALL_LAUNCHER_OPTIONS = ['android', 'base', 'linux', 'mac', 'windows', 'windows_editor', 'windows_dedicated', 'windows_generic']
ANDROID = False
IOS = False # Not implemented - see SPEC-2505
LINUX = sys.platform.startswith('linux') # Not implemented - see SPEC-2501
LINUX = sys.platform.startswith('linux')
MAC = sys.platform.startswith('darwin')
WINDOWS = sys.platform.startswith('win')
@ -54,9 +54,11 @@ elif LINUX:
HOST_OS_PLATFORM = 'linux'
HOST_OS_EDITOR = 'linux_editor'
HOST_OS_DEDICATED_SERVER = 'linux_dedicated'
from ly_test_tools.launchers.platforms.linux.launcher import (LinuxLauncher, LinuxEditor, DedicatedLinuxLauncher)
HOST_OS_GENERIC_EXECUTABLE = 'linux_generic'
from ly_test_tools.launchers.platforms.linux.launcher import (LinuxLauncher, LinuxEditor, DedicatedLinuxLauncher, LinuxGenericLauncher)
LAUNCHERS['linux'] = LinuxLauncher
LAUNCHERS['linux_editor'] = LinuxEditor
LAUNCHERS['linux_dedicated'] = DedicatedLinuxLauncher
LAUNCHERS['linux_generic'] = LinuxGenericLauncher
else:
logger.warning(f'WARNING: LyTestTools only supports Windows, Mac, and Linux. Unexpectedly detected HOST_OS_PLATFORM: "{HOST_OS_PLATFORM}".')

@ -265,7 +265,7 @@ class AbstractResourceLocator(object):
Return path to AssetProcessor's log file using the project bin dir
:return: path to 'AP_Gui.log' file in <ap_log_dir> folder
"""
return os.path.join(self.ap_log_dir(), 'AP_Gui.log')
return os.path.join(self.ap_log_dir(), 'AP_GUI.log')
def project_cache(self):
"""

@ -23,9 +23,6 @@ class _LinuxResourceManager(AbstractResourceLocator):
"""
Override for locating resources in a Linux operating system running LyTestTools.
"""
def __init__(self, build_directory: str, project: str):
pass
def platform_config_file(self):
"""
:return: path to the platform config file

@ -31,45 +31,59 @@ def kill_processes_named(names, ignore_extensions=False):
Kills all processes with a given name
:param names: string process name, or list of strings of process name
:param ignore_extensions: ignore trailing file extension
:param ignore_extensions: ignore trailing file extensions. By default 'abc.exe' will not match 'abc'. Note that
enabling this will cause 'abc.exe' to match 'abc', 'abc.bat', and 'abc.sh', though 'abc.GameLauncher.exe'
will not match 'abc.DedicatedServer'
"""
if not names:
return
names = [names] if isinstance(names, str) else names
name_set = set()
if isinstance(names, str):
name_set.add(names)
else:
name_set.update(names)
if ignore_extensions:
names = [_remove_extension(name) for name in names]
# both exact matches and extensionless
stripped_names = set()
for name in name_set:
stripped_names.add(_remove_extension(name))
name_set.update(stripped_names)
# remove any blank names, which may empty the list
names = list(filter(lambda x: not x.isspace(), names))
if not names:
name_set = set(filter(lambda x: not x.isspace(), name_set))
if not name_set:
return
logger.info(f"Killing all processes named {names}")
process_list_to_kill = []
logger.info(f"Killing all processes named {name_set}")
process_set_to_kill = set()
for process in _safe_get_processes(['name', 'pid']):
try:
proc_name = process.name()
except psutil.AccessDenied:
logger.info(f"Process {process} permissions error during kill_processes_named()", exc_info=True)
logger.warning(f"Process {process} permissions error during kill_processes_named()", exc_info=True)
continue
except psutil.ProcessLookupError:
logger.debug(f"Process {process} could not be killed during kill_processes_named() and was likely already stopped", exc_info=True)
logger.debug(f"Process {process} could not be killed during kill_processes_named() and was likely already "
f"stopped", exc_info=True)
continue
except psutil.NoSuchProcess:
logger.debug(f"Process '{process}' was active when list of processes was requested but it was not found "
f"during kill_processes_named()", exc_info=True)
continue
if ignore_extensions:
proc_name = _remove_extension(proc_name)
if proc_name in name_set:
logger.debug(f"Found process with name {proc_name}.")
process_set_to_kill.add(process)
if proc_name in names:
logger.debug(f"Found process with name {proc_name}. Attempting to kill...")
process_list_to_kill.append(process)
if ignore_extensions:
extensionless_name = _remove_extension(proc_name)
if extensionless_name in name_set:
process_set_to_kill.add(process)
_safe_kill_process_list(process_list_to_kill)
if process_set_to_kill:
_safe_kill_processes(process_set_to_kill)
def kill_processes_started_from(path):
@ -90,7 +104,7 @@ def kill_processes_started_from(path):
if process_path.lower().startswith(path.lower()):
process_list.append(process)
_safe_kill_process_list(process_list)
_safe_kill_processes(process_list)
else:
logger.warning(f"Path:'{path}' not found")
@ -118,7 +132,7 @@ def kill_processes_with_name_not_started_from(name, path):
logger.info("%s -> %s" % (os.path.dirname(process_path.lower()), path))
proccesses_to_kill.append(process)
_safe_kill_process_list(proccesses_to_kill)
_safe_kill_processes(proccesses_to_kill)
else:
logger.warning(f"Path:'{path}' not found")
@ -151,10 +165,12 @@ def process_exists(name, ignore_extensions=False):
:return: A boolean determining whether the process is alive or not
"""
name = name.lower()
if ignore_extensions:
name = _remove_extension(name)
if name.isspace():
return False
if ignore_extensions:
name_extensionless = _remove_extension(name)
for process in _safe_get_processes(["name"]):
try:
proc_name = process.name().lower()
@ -165,10 +181,17 @@ def process_exists(name, ignore_extensions=False):
except psutil.AccessDenied as e:
logger.info(f"Permissions issue on {process} during process_exists check", exc_info=True)
continue
if proc_name == name: # abc.exe matches abc.exe
return True
if ignore_extensions:
proc_name = _remove_extension(proc_name)
if proc_name == name:
proc_name_extensionless = _remove_extension(proc_name)
if proc_name_extensionless == name: # abc matches abc.exe
return True
if proc_name == name_extensionless: # abc.exe matches abc
return True
# don't check proc_name_extensionless against name_extensionless: abc.exe and abc.exe are already tested,
# however xyz.Gamelauncher should not match xyz.DedicatedServer
return False
@ -341,17 +364,14 @@ def _safe_kill_process(proc):
except Exception: # purposefully broad
logger.warning("Unexpected exception while terminating process", exc_info=True)
def _safe_kill_process_list(proc_list):
def _safe_kill_processes(processes):
"""
Kills a given process without raising an error
:param proc_list: The process list to kill
:param processes: An iterable of processes to kill
"""
def on_terminate(proc):
print(f"process '{proc.name()}' with id '{proc.pid}' terminated with exit code {proc.returncode}")
for proc in proc_list:
for proc in processes:
try:
logger.info(f"Terminating process '{proc.name()}' with id '{proc.pid}'")
proc.kill()
@ -360,12 +380,14 @@ def _safe_kill_process_list(proc_list):
except psutil.NoSuchProcess:
logger.debug("Termination request ignored, process was already terminated during iteration", exc_info=True)
except Exception: # purposefully broad
logger.warning("Unexpected exception while terminating process", exc_info=True)
logger.warning("Unexpected exception ignored while terminating process", exc_info=True)
def on_terminate(proc):
logger.info(f"process '{proc.name()}' with id '{proc.pid}' terminated with exit code {proc.returncode}")
try:
psutil.wait_procs(proc_list, timeout=30, callback=on_terminate)
psutil.wait_procs(processes, timeout=30, callback=on_terminate)
except Exception: # purposefully broad
logger.warning("Unexpected exception while waiting for process to terminate", exc_info=True)
logger.warning("Unexpected exception while waiting for processes to terminate", exc_info=True)
def _terminate_and_confirm_dead(proc):
@ -383,7 +405,7 @@ def _terminate_and_confirm_dead(proc):
def _remove_extension(filename):
"""
Returns a file name without its extension
Returns a file name without its extension, if any is present
:param filename: The name of a file
:return: The name of the file without the extension
@ -465,3 +487,17 @@ def close_windows_process(pid, timeout=20, raise_on_missing=False):
# Wait for asyncronous termination
waiter.wait_for(lambda: pid not in psutil.pids(), timeout=timeout,
exc=TimeoutError(f"Process {pid} never terminated"))
def get_display_env():
"""
Fetches environment variables with an appropriate display (monitor) configured,
useful for subprocess calls to UI applications
:return: A dictionary containing environment variables (per os.environ)
"""
env = os.environ.copy()
if not ly_test_tools.WINDOWS:
if 'DISPLAY' not in env.keys():
# assume Display 1 is available in another session
env['DISPLAY'] = ':1'
return env

@ -13,6 +13,7 @@ import subprocess
import ly_test_tools.environment.waiter
import ly_test_tools.launchers.exceptions
import ly_test_tools.environment.process_utils as process_utils
from ly_test_tools.launchers.platforms.base import Launcher
from ly_test_tools.launchers.exceptions import TeardownError, ProcessNotStartedError
@ -68,7 +69,7 @@ class LinuxLauncher(Launcher):
"""
command = [self.binary_path()] + self.args
self._tmpout = TemporaryFile()
self._proc = subprocess.Popen(command, stdout=self._tmpout, stderr=self._tmpout, universal_newlines=True)
self._proc = subprocess.Popen(command, stdout=self._tmpout, stderr=self._tmpout, universal_newlines=True, env=process_utils.get_display_env())
log.debug(f"Started Linux Launcher with command: {command}")
def get_output(self, encoding="utf-8"):
@ -221,3 +222,28 @@ class LinuxEditor(LinuxLauncher):
"""
assert self.workspace.project is not None
return os.path.join(self.workspace.paths.build_directory(), "Editor")
class LinuxGenericLauncher(LinuxLauncher):
def __init__(self, build, exe_file_name, args=None):
super(LinuxLauncher, self).__init__(build, args)
self.exe_file_name = exe_file_name
self.expected_executable_path = os.path.join(
self.workspace.paths.build_directory(), f"{self.exe_file_name}")
if not os.path.exists(self.expected_executable_path):
raise ProcessNotStartedError(
f"Unable to locate executable '{self.exe_file_name}' "
f"in path: '{self.expected_executable_path}'")
def binary_path(self):
"""
Return full path to the executable file for this build's configuration and project
Relies on the build_directory() in self.workspace.paths to be accurate
:return: full path to the given exe file
"""
assert self.workspace.project is not None, (
'Project cannot be NoneType - please specify a project name string.')
return self.expected_executable_path

@ -7,21 +7,22 @@ SPDX-License-Identifier: Apache-2.0 OR MIT
A class to control functionality of Lumberyard's asset processor.
The class manages a workspace's asset processor and asset configurations.
"""
import os
import datetime
import logging
import subprocess
import socket
import time
import tempfile
import os
import psutil
import shutil
import socket
import stat
import subprocess
import tempfile
import time
from typing import List, Tuple
import psutil
import ly_test_tools
import ly_test_tools.environment.waiter as waiter
import ly_test_tools.environment.file_system as file_system
import ly_test_tools.environment.process_utils as process_utils
import ly_test_tools.environment.waiter as waiter
import ly_test_tools.o3de.pipeline_utils as utils
from ly_test_tools.o3de.ap_log_parser import APLogParser
@ -177,24 +178,26 @@ class AssetProcessor(object):
"""
Read the a port chosen by AP from the log
"""
start_time = time.time()
read_port_timeout = 10
while (time.time() - start_time) < read_port_timeout:
port = None
def _get_port_from_log():
nonlocal port
if not os.path.exists(self._workspace.paths.ap_gui_log()):
logger.debug(f"Log at {self._workspace.paths.ap_gui_log()} doesn't exist, sleeping")
else:
return False
log = APLogParser(self._workspace.paths.ap_gui_log())
if len(log.runs):
try:
port = log.runs[-1][port_type]
if port:
logger.info(f"Read port type {port_type} : {port}")
logger.debug(f"Read port type {port_type} : {port}")
return True
except Exception as ex: # intentionally broad
logger.debug("Failed to read port from file", exc_info=ex)
return False
err = AssetProcessorError(f"Failed to read port type {port_type} from {self._workspace.paths.ap_gui_log()}")
waiter.wait_for(_get_port_from_log, timeout=10, exc=err)
return port
except Exception: # intentionally broad
pass
time.sleep(1)
logger.warning(f"Failed to read port type {port_type}")
return 0
def set_control_connection(self, connection):
self._control_connection = connection
@ -206,13 +209,8 @@ class AssetProcessor(object):
"""
if not self._control_connection:
control_timeout = 60
try:
return self.connect_socket("Control Connection", self.read_control_port,
set_port_method=self.set_control_connection, timeout=control_timeout)
except AssetProcessorError as e:
# We dont want a failure of our test socket connection to fail the entire test automatically.
logger.error(f"Failed to connect control socket with error {e}")
pass
return True, None
def using_temp_workspace(self):
@ -227,34 +225,40 @@ class AssetProcessor(object):
:param set_port_method: If set, method to call with the established connection
:param timeout: Max seconds to attempt connection for
"""
connection_timeout = timeout
connect_port = read_port_method()
logger.debug(f"Waiting for connection to AP {port_name}: {host}:{connect_port}, "
f"{connection_timeout} seconds remaining")
start_time = time.time()
while (time.time() - start_time) < connection_timeout:
logger.debug(f"Attempting to for connect to AP {port_name}: {host}:{connect_port} for {timeout} seconds")
def _attempt_connection():
nonlocal connect_port
if self._ap_proc.poll() is not None:
raise AssetProcessorError(f"Asset processor exited early with errorcode: {self._ap_proc.returncode}")
connection_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection_socket.settimeout(10.0)
connection_socket.settimeout(timeout)
try:
connection_socket.connect((host, connect_port))
logger.debug(f"Connection to AP {port_name} was successful")
if set_port_method is not None:
set_port_method(connection_socket)
return True, None
except Exception: # Purposefully broad
# Short delay to prevent immediate failure due to slower starting applications such as debug builds
time.sleep(0.01)
return True
except Exception as ex: # Purposefully broad
logger.debug(f"Failed to connect to {host}:{connect_port}", exc_info=ex)
if not connect_port or not self.using_temp_workspace():
# If we're not using a temp workspace with a fresh log it's possible we're reading a port from
# a previous run and the log just hasn't written yet, we need to keep checking the log for a new
# port to use
try:
new_connect_port = read_port_method()
if new_connect_port != connect_port:
logger.debug(
f"Read new connect port for {port_name}: {host}:{new_connect_port}")
logger.debug(f"Found new connect port for {port_name}: {host}:{new_connect_port}")
connect_port = new_connect_port
raise AssetProcessorError(f"Could not connect to AP {port_name}")
except Exception as read_exception: # Purposefully broad
logger.debug(f"Failed to read port data", exc_info=read_exception)
return False
err = AssetProcessorError(f"Could not connect to AP {port_name} on {host}:{connect_port}")
waiter.wait_for(_attempt_connection, timeout=timeout, exc=err)
return True, None
def stop(self, timeout=60):
"""
@ -436,15 +440,14 @@ class AssetProcessor(object):
extra_params=None, add_gem_scan_folders=None, add_config_scan_folders=None, decode=True,
expect_failure=False, quitonidle=False, connect_to_ap=False, accept_input=True, run_until_idle=True,
scan_folder_pattern=None):
ap_path = self._workspace.paths.asset_processor()
ap_path = os.path.abspath(self._workspace.paths.asset_processor())
ap_exe_path = os.path.dirname(ap_path)
extra_gui_params = []
if quitonidle:
extra_gui_params.append("--quitonidle")
if accept_input:
extra_gui_params.append("--acceptInput")
ap_exe_path = os.path.dirname(self._workspace.paths.asset_processor())
logger.info("Starting asset processor")
if self.process_exists():
logger.error("Asset processor already started. Stop first")
@ -483,20 +486,30 @@ class AssetProcessor(object):
logger.warning(f"Cannot capture output when leaving AP connection open.")
logger.info(f"Launching AP with command: {command}")
self._ap_proc = subprocess.Popen(command, cwd=ap_exe_path)
try:
self._ap_proc = subprocess.Popen(command, cwd=ap_exe_path, env=process_utils.get_display_env())
if accept_input and not quitonidle:
if accept_input:
self.connect_control()
if connect_to_ap:
self.connect_listen()
if quitonidle:
waiter.wait_for(lambda: not self.process_exists(), timeout=timeout)
waiter.wait_for(lambda: not self.process_exists(), timeout=timeout,
exc=AssetProcessorError(f"Failed to quit on idle within {timeout} seconds"))
elif run_until_idle and accept_input:
if not self.wait_for_idle():
return False, None
return True, None
except BaseException as be: # purposefully broad
logger.exception("Exception while starting Asset Processor", be)
# clean up to avoid leaking open AP process to future tests
try:
self._ap_proc.kill()
except Exception as ex:
logger.exception("Ignoring exception while trying to terminate Asset Processor", ex)
raise # raise whatever prompted us to clean up
def connect_listen(self, timeout=DEFAULT_TIMEOUT_SECONDS):
# Wait for the AP we launched to be ready to accept a connection
@ -581,14 +594,10 @@ class AssetProcessor(object):
:param expect_failure: asset processing is expected to fail, so don't error on a failure, and assert on no failure.
"""
logger.info(f"Launching AP with command: {command}")
start = datetime.datetime.now()
try:
duration = datetime.timedelta(seconds=timeout)
except TypeError:
logger.warning("Cannot set timeout value of '{}' seconds, defaulting to {} hours".format(
timeout, DEFAULT_TIMEOUT_HOURS))
duration = datetime.timedelta(hours=DEFAULT_TIMEOUT_HOURS)
timeout = duration.total_seconds()
start = time.time()
if type(timeout) not in [int, float] or timeout < 1:
logger.warning(f"Invalid timeout {timeout} - defaulting to {DEFAULT_TIMEOUT_SECONDS} seconds")
timeout = DEFAULT_TIMEOUT_SECONDS
run_result = subprocess.run(command, close_fds=True, timeout=timeout, capture_output=capture_output)
output_list = None
@ -609,8 +618,7 @@ class AssetProcessor(object):
elif expect_failure:
logger.error(f"{command} was expected to fail, but instead ran without failure.")
return True, output_list
logger.info(
f"{command} completed successfully in {(datetime.datetime.now() - start).seconds} seconds")
logger.info(f"{command} completed successfully in {time.time() - start} seconds")
return True, output_list
def set_failure_log_folder(self, log_root):
@ -743,14 +751,14 @@ class AssetProcessor(object):
:return: Absolute path of added scan folder
"""
if os.path.isabs(folder_name):
if not folder_name in self._override_scan_folders:
if folder_name not in self._override_scan_folders:
self._override_scan_folders.append(folder_name)
logger.info(f'Adding override scan folder {folder_name}')
return folder_name
else:
if not self._temp_asset_root:
logger.warning(f"Can't create scan folder, no temporary asset workspace has been created")
return
logger.warning(f"Can not create scan folder, no temporary asset workspace has been created")
return ""
scan_folder = os.path.join(self._temp_asset_root if self._temp_asset_root else self._workspace.paths.engine_root(),
folder_name)
if not os.path.isdir(scan_folder):

@ -9,8 +9,7 @@ import logging
import os
import subprocess
import ly_test_tools
from ly_test_tools.environment.process_utils import kill_processes_named as kill_processes_named
import ly_test_tools.environment.process_utils as process_utils
logger = logging.getLogger(__name__)
@ -23,7 +22,7 @@ def start_asset_processor(bin_dir):
:return: A subprocess.Popen object for the AssetProcessor process.
"""
os.chdir(bin_dir)
asset_processor = subprocess.Popen(['AssetProcessor.exe'])
asset_processor = subprocess.Popen(['AssetProcessor'], env=process_utils.get_display_env())
return_code = asset_processor.poll()
if return_code is not None and return_code != 0:
@ -40,11 +39,9 @@ def kill_asset_processor():
:return: None
"""
kill_processes_named('AssetProcessor_tmp', ignore_extensions=True)
kill_processes_named('AssetProcessor', ignore_extensions=True)
kill_processes_named('AssetProcessorBatch', ignore_extensions=True)
kill_processes_named('AssetBuilder', ignore_extensions=True)
kill_processes_named('rc', ignore_extensions=True)
kill_processes_named('Lua Editor', ignore_extensions=True)
process_utils.kill_processes_named('AssetProcessor_tmp', ignore_extensions=True)
process_utils.kill_processes_named('AssetProcessor', ignore_extensions=True)
process_utils.kill_processes_named('AssetProcessorBatch', ignore_extensions=True)
process_utils.kill_processes_named('AssetBuilder', ignore_extensions=True)
process_utils.kill_processes_named('rc', ignore_extensions=True)
process_utils.kill_processes_named('Lua Editor', ignore_extensions=True)

@ -63,7 +63,6 @@ class TestAutomatedTestingProject(object):
# Clean up processes after the test is finished
process_utils.kill_processes_named(names=process_utils.LY_PROCESS_KILL_LIST, ignore_extensions=True)
@pytest.mark.skipif(not ly_test_tools.WINDOWS, reason="Editor currently only functions on Windows")
def test_StartEditor_Sanity(self, project):
"""
The `test_StartEditor_Sanity` test function is similar to the previous example with minor adjustments. A
@ -86,7 +85,7 @@ class TestAutomatedTestingProject(object):
# Call the Editor executable
with editor.start():
# Wait for the process to exist
waiter.wait_for(lambda: process_utils.process_exists("Editor", ignore_extensions=True))
waiter.wait_for(lambda: process_utils.process_exists("Editor.exe", ignore_extensions=True))
finally:
# Clean up processes after the test is finished
process_utils.kill_processes_named(names=process_utils.LY_PROCESS_KILL_LIST, ignore_extensions=True)

@ -58,10 +58,8 @@ class TestAssetProcessor(object):
under_test.start(connect_to_ap=True)
assert under_test._ap_proc is not None
mock_popen.assert_called_once_with([mock_ap_path, '--zeroAnalysisMode',
f'--regset="/Amazon/AzCore/Bootstrap/project_path={mock_project_path}"',
'--logDir', under_test.log_root(),
'--acceptInput', '--platforms', 'bar'], cwd=os.path.dirname(mock_ap_path))
mock_popen.assert_called_once()
assert '--zeroAnalysisMode' in mock_popen.call_args[0][0]
mock_connect.assert_called()
@mock.patch('ly_test_tools._internal.managers.workspace.AbstractWorkspaceManager')
@ -150,10 +148,8 @@ class TestAssetProcessor(object):
result, _ = under_test.batch_process(None, False)
assert not result
mock_run.assert_called_once_with([apb_path,
f'--regset="/Amazon/AzCore/Bootstrap/project_path={mock_project_path}"',
'--logDir', under_test.log_root()],
close_fds=True, capture_output=False, timeout=28800.0)
mock_run.assert_called_once()
assert f'--regset="/Amazon/AzCore/Bootstrap/project_path={mock_project_path}"' in mock_run.call_args[0][0]
@mock.patch('ly_test_tools._internal.managers.workspace.AbstractWorkspaceManager')

@ -223,7 +223,7 @@ class TestCloseWindowsProcess(unittest.TestCase):
mock_enum.assert_called_once()
class Test(unittest.TestCase):
class TestProcessMatching(unittest.TestCase):
@mock.patch("ly_test_tools.environment.process_utils._safe_get_processes")
def test_ProcExists_HasExtension_Found(self, mock_get_proc):
@ -261,18 +261,55 @@ class Test(unittest.TestCase):
self.assertTrue(result)
proc_mock.name.assert_called()
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_process', mock.MagicMock)
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_processes')
@mock.patch('ly_test_tools.environment.process_utils._safe_get_processes')
def test_KillProcNamed_MockKill_SilentSuccess(self, mock_get_proc):
def test_KillProcNamed_ExactMatch_Killed(self, mock_get_proc, mock_kill_proc):
name = "dummy.exe"
proc_mock = mock.MagicMock()
proc_mock.name.return_value = name
mock_get_proc.return_value = [proc_mock]
process_utils.kill_processes_named("dummy.exe", ignore_extensions=False)
mock_kill_proc.assert_called()
proc_mock.name.assert_called()
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_processes')
@mock.patch('ly_test_tools.environment.process_utils._safe_get_processes')
def test_KillProcNamed_NearMatch_Ignore(self, mock_get_proc, mock_kill_proc):
name = "dummy.exe"
proc_mock = mock.MagicMock()
proc_mock.name.return_value = name
mock_get_proc.return_value = [proc_mock]
process_utils.kill_processes_named("dummy", ignore_extensions=False)
mock_kill_proc.assert_not_called()
proc_mock.name.assert_called()
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_processes')
@mock.patch('ly_test_tools.environment.process_utils._safe_get_processes')
def test_KillProcNamed_NearMatchIgnoreExtension_Kill(self, mock_get_proc, mock_kill_proc):
name = "dummy.exe"
proc_mock = mock.MagicMock()
proc_mock.name.return_value = name
mock_get_proc.return_value = [proc_mock]
process_utils.kill_processes_named("dummy", ignore_extensions=True)
mock_kill_proc.assert_called()
proc_mock.name.assert_called()
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_processes')
@mock.patch('ly_test_tools.environment.process_utils._safe_get_processes')
def test_KillProcNamed_ExactMatchIgnoreExtension_Killed(self, mock_get_proc, mock_kill_proc):
name = "dummy.exe"
proc_mock = mock.MagicMock()
proc_mock.name.return_value = name
mock_get_proc.return_value = [proc_mock]
process_utils.kill_processes_named("dummy.exe", ignore_extensions=True)
mock_kill_proc.assert_called()
proc_mock.name.assert_called()
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_process', mock.MagicMock)
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_processes', mock.MagicMock)
@mock.patch('ly_test_tools.environment.process_utils._safe_get_processes')
@mock.patch('os.path.exists')
def test_KillProcFrom_MockKill_SilentSuccess(self, mock_path, mock_get_proc):
@ -293,7 +330,7 @@ class Test(unittest.TestCase):
mock_kill.assert_called()
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_process', mock.MagicMock)
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_processes', mock.MagicMock)
@mock.patch('psutil.Process')
def test_KillProcPid_NoProc_SilentPass(self, mock_psutil):
mock_proc = mock.MagicMock()
@ -302,7 +339,7 @@ class Test(unittest.TestCase):
process_utils.kill_process_with_pid(1)
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_process', mock.MagicMock)
@mock.patch('ly_test_tools.environment.process_utils._safe_kill_processes', mock.MagicMock)
@mock.patch('psutil.Process')
def test_KillProcPidRaiseOnMissing_NoProc_Raises(self, mock_psutil):
mock_proc = mock.MagicMock()
@ -339,7 +376,7 @@ class Test(unittest.TestCase):
mock_wait_procs.side_effect = psutil.PermissionError()
proc_mock = mock.MagicMock()
process_utils._safe_kill_process_list(proc_mock)
process_utils._safe_kill_processes(proc_mock)
mock_wait_procs.assert_called()
mock_log_warn.assert_called()

Loading…
Cancel
Save