You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
o3de/Tools/LauncherTestTools/run_launcher_tests.py

132 lines
5.1 KiB
Python

"""
Copyright (c) Contributors to the Open 3D Engine Project. For complete copyright and license terms please see the LICENSE at the root of this distribution.
SPDX-License-Identifier: Apache-2.0 OR MIT
"""
import json
import logging
import os
import time
import shutil
import subprocess
import sys
logger = logging.getLogger(__name__)
class PlatformDriver:
def __init__(self, project_json_path, project_launcher_tests_folder, test_names, screenshots_folder, screenshots_interval):
self.project_json_path = project_json_path
self.project_launcher_tests_folder = project_launcher_tests_folder
self.test_names = test_names
self.screenshots_folder = screenshots_folder
self.screenshots_interval = screenshots_interval
def read_json_data(self, path):
"""Read a json file and return the data"""
try:
with open(path) as json_file:
json_data = json.load(json_file)
except Exception, e:
logger.error("Failed to read json file: '{}'".format(path))
logger.error("Exception: '{}'".format(e))
sys.exit(1)
return json_data
def read_project_name(self):
project_data = self.read_json_data(self.project_json_path)
return project_data['project_name']
def run_test(self, map, dynamic_slice, timeout, pass_string, fail_string):
""" Meant to be overridden in derived classes. """
return True
def run_launcher_tests(self):
"""Discovers all of the available tests in launcher_tests.json and runs them.
"""
# Delete the old screenshots folder if it exists
if os.path.exists(self.screenshots_folder):
shutil.rmtree(self.screenshots_folder)
# Read the launcher_tests.json file.
launcher_tests_data = self.read_json_data(os.path.join(self.project_launcher_tests_folder, 'launcher_tests.json'))
# Run each of the tests found in the launcher_tests.json file.
ok = True
for launcher_test_data in launcher_tests_data['launcher_tests']:
# Skip over this test if specific tests are specified and this is not one of them.
if self.test_names and launcher_test_data.get('name').lower() not in [x.lower() for x in self.test_names]:
continue
ok = ok and self.run_test(
launcher_test_data.get('name'),
launcher_test_data.get('map'),
launcher_test_data.get('dynamic_slice'),
launcher_test_data.get('timeout'),
launcher_test_data.get('pass_string', 'AUTO_LAUNCHER_TEST_COMPLETE'),
launcher_test_data.get('fail_string', 'AUTO_LAUNCHER_TEST_FAIL'))
return ok
def monitor_process_output(self, test_name, command, pass_string, fail_string, timeout, log_file=None):
self.process = subprocess.Popen(command, stdout=subprocess.PIPE)
# On windows, function was failing (I think) because it checked the poll before the process
# had a chance to start, so added a short delay to give it some time to startup.
# It also failed if the log_file was open()'d when it didn't exist yet.
# Delay seems to have fixed the problem. 0.25 sec was too short.
time.sleep(0.5)
if log_file:
# The process we're starting sends it's output to the log file instead of stdout
# so we need to monitor that instead of the stdout.
fp = open(log_file)
# Detect log output messages or timeout exceeded
start = time.time()
last_time = start
screenshot_time_remaining = self.screenshots_interval
screenshot_index = 0
logger.info('Waiting for test to complete.')
message = ""
result = False
while True:
if log_file:
line = fp.readline()
else:
line = self.process.stdout.readline()
if line == '' and self.process.poll() is not None:
break
if line:
logger.info(line.rstrip())
if pass_string in line:
message = "Detected {}. Test completed.".format(pass_string)
result = True
break
if fail_string in line:
message = "Detected {}. Test failed.".format(fail_string)
break
if time.time() - start > timeout:
message = "Timeout of {} reached. Test failed.".format(timeout)
break
rc = self.process.poll()
cur_time = time.time()
screenshot_time_remaining = screenshot_time_remaining - (cur_time - last_time)
last_time = cur_time
if screenshot_time_remaining <= 0:
self.take_screenshot(os.path.join(self.screenshots_folder, "{}_screen{}".format(test_name.replace(' ', '_'), screenshot_index)))
screenshot_index = screenshot_index + 1
screenshot_time_remaining = self.screenshots_interval
logger.info(message)
if log_file:
fp.close()
return result