# # All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or # its licensors. # # For complete copyright and license terms please see the LICENSE at the root of this # distribution (the "License"). All use of this software is governed by the License, # or, if provided, by the license below or the license accompanying this file. Do not # remove or modify any license notices. This file is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # import os import json import subprocess import re import git_utils from git_utils import Repo from enum import Enum # Returns True if the specified child path is a child of the specified parent path, otherwise False def is_child_path(parent_path, child_path): parent_path = os.path.abspath(parent_path) child_path = os.path.abspath(child_path) return os.path.commonpath([os.path.abspath(parent_path)]) == os.path.commonpath([os.path.abspath(parent_path), os.path.abspath(child_path)]) class TestImpact: def __init__(self, config_file, pipeline, dst_commit): self.__pipeline = pipeline self.__dst_commit = dst_commit self.__src_commit = None self.__has_src_commit = False self.__parse_config_file(config_file) if self.__use_test_impact_analysis and not self.__is_pipeline_of_truth: self.__generate_change_list() # Parse the configuration file and retrieve the data needed for launching the test impact analysis runtime def __parse_config_file(self, config_file): print(f"Attempting to parse configuration file '{config_file}'...") with open(config_file, "r") as config_data: config = json.load(config_data) # Repository self.__repo_dir = config["repo"]["root"] # Jenkins self.__use_test_impact_analysis = config["jenkins"]["use_test_impact_analysis"] self.__pipeline_of_truth = config["jenkins"]["pipeline_of_truth"] print(f"Pipeline of truth: '{self.__pipeline_of_truth}'.") print(f"This pipeline: '{self.__pipeline}'.") if self.__pipeline in self.__pipeline_of_truth: self.__is_pipeline_of_truth = True else: self.__is_pipeline_of_truth = False print(f"Is pipeline of truth: '{self.__is_pipeline_of_truth}'.") # TIAF binary self.__tiaf_bin = config["repo"]["tiaf_bin"] if self.__use_test_impact_analysis and not os.path.isfile(self.__tiaf_bin): raise FileNotFoundError("Could not find tiaf binary") # Workspaces self.__active_workspace = config["workspace"]["active"]["root"] self.__historic_workspace = config["workspace"]["historic"]["root"] self.__temp_workspace = config["workspace"]["temp"]["root"] # Last commit hash last_commit_hash_path_rel = config["workspace"]["historic"]["relative_paths"]["last_run_hash_file"] self.__last_commit_hash_path = os.path.join(self.__historic_workspace, last_commit_hash_path_rel) print("The configuration file was parsed successfully.") # Restricts change lists from checking in test impact analysis files def __check_for_restricted_files(self, file_path): if is_child_path(self.__active_workspace, file_path) or is_child_path(self.__historic_workspace, file_path) or is_child_path(self.__temp_workspace, file_path): raise ValueError(f"Checking in test impact analysis framework files is illegal: '{file_path}''.") def __read_last_run_hash(self): self.__has_src_commit = False if os.path.isfile(self.__last_commit_hash_path): print(f"Previous commit hash found at '{self.__last_commit_hash_path}'.") with open(self.__last_commit_hash_path) as file: self.__src_commit = file.read() self.__has_src_commit = True def __write_last_run_hash(self, last_run_hash): f = open(self.__last_commit_hash_path, "w") f.write(last_run_hash) f.close() # Determines the change list bewteen now and the last tiaf run (if any) def __generate_change_list(self): self.__has_change_list = False self.__change_list_path = None # Check whether or not a previous commit hash exists (no hash is not a failure) self.__read_last_run_hash() if self.__has_src_commit == True: if git_utils.is_descendent(self.__src_commit, self.__dst_commit) == False: print(f"Source commit '{self.__src_commit}' and destination commit '{self.__dst_commit}' are not related.") return diff_path = os.path.join(self.__temp_workspace, "changelist.diff") try: git_utils.create_diff_file(self.__src_commit, self.__dst_commit, diff_path) except FileNotFoundError as e: print(e) return # A diff was generated, attempt to parse the diff and construct the change list print(f"Generated diff between commits '{self.__src_commit}' and '{self.__dst_commit}': '{diff_path}'.") change_list = {} change_list["createdFiles"] = [] change_list["updatedFiles"] = [] change_list["deletedFiles"] = [] with open(diff_path, "r") as diff_data: lines = diff_data.readlines() for line in lines: match = re.split("^R[0-9]+\\s(\\S+)\\s(\\S+)", line) if len(match) > 1: # File rename self.__check_for_restricted_files(match[1]) self.__check_for_restricted_files(match[2]) # Treat renames as a deletion and an addition change_list["deletedFiles"].append(match[1]) change_list["createdFiles"].append(match[2]) else: match = re.split("^[AMD]\\s(\\S+)", line) self.__check_for_restricted_files(match[1]) if len(match) > 1: if line[0] == 'A': # File addition change_list["createdFiles"].append(match[1]) elif line[0] == 'M': # File modification change_list["updatedFiles"].append(match[1]) elif line[0] == 'D': # File Deletion change_list["deletedFiles"].append(match[1]) # Serialize the change list to the JSON format the test impact analysis runtime expects change_list_json = json.dumps(change_list, indent = 4) change_list_path = os.path.join(self.__temp_workspace, "changelist.json") f = open(change_list_path, "w") f.write(change_list_json) f.close() print(f"Change list constructed successfully: '{change_list_path}'.") print(f"{len(change_list['createdFiles'])} created files, {len(change_list['updatedFiles'])} updated files and {len(change_list['deletedFiles'])} deleted files.") # Note: an empty change list generated due to no changes between last and current commit is valid self.__has_change_list = True self.__change_list_path = change_list_path else: print("No previous commit hash found, regular or seeded sequences only will be run.") self.__has_change_list = False return # Runs the specified test sequence def run(self, suite, safe_mode, test_timeout, global_timeout): args = [] # Suite args.append(f"--suite={suite}") print(f"Test suite is set to '{suite}'.") # Timeouts if test_timeout != None: args.append(f"--ttimeout={test_timeout}") print(f"Test target timeout is set to {test_timeout} seconds.") if global_timeout != None: args.append(f"--gtimeout={global_timeout}") print(f"Global sequence timeout is set to {test_timeout} seconds.") # If test impact analysis is enabled: # -> Pipleine of truth will perform a seed that will continue until the sequence is complete regardless of test failues # -> Non pipline of truth will attempt to perform an impact analysis sequence and exit early upon the fist test failure # If test impact analysis is disabled: # -> Pipleine of truth will perform a regular sequence that will continue until the sequence is complete regardless of test failues # -> Non pipline of truth will perform a regular sequence and exit early upon the fist test failure if self.__use_test_impact_analysis: print("Test impact analysis ie enabled.") # Pipeline of truth sequence if self.__is_pipeline_of_truth: # Sequence type args.append("--sequence=seed") print("Sequence type is set to 'seed'.") # Test failure policy args.append("--fpolicy=continue") print("Test failure policy is set to 'continue'.") # Non pipeline of truth sequence else: if self.__has_change_list: # Change list args.append(f"--changelist={self.__change_list_path}") print(f"Change list is set to '{self.__change_list_path}'.") # Sequence type args.append("--sequence=tianowrite") print("Sequence type is set to 'tianowrite'.") # Safe mode if safe_mode: args.append("--safemode=on") print("Safe mode set to 'on'.") else: args.append("--safemode=off") print("Safe mode set to 'off'.") else: args.append("--sequence=regular") print("Sequence type is set to 'regular'.") # Test failure policy args.append("--fpolicy=abort") print("Test failure policy is set to 'abort'.") else: print("Test impact analysis ie disabled.") # Sequence type args.append("--sequence=regular") print("Sequence type is set to 'seed'.") # Pipeline of truth sequence if self.__is_pipeline_of_truth: # Test failure policy args.append("--fpolicy=continue") print("Test failure policy is set to 'continue'.") # Non pipeline of truth sequence else: # Test failure policy args.append("--fpolicy=abort") print("Test failure policy is set to 'abort'.") print("Args: ", end='') print(*args) result = subprocess.run([self.__tiaf_bin] + args) # If the sequence completed 9with or without failures) we will update the historical meta-data if result.returncode == 0 or result.returncode == 7: print("Test impact analysis runtime returned successfully.") if self.__is_pipeline_of_truth: print("Writing historical meta-data...") self.__write_last_run_hash(self.__dst_commit) print("Complete!") else: print(f"The test impact analysis runtime returned with error: '{result.returncode}'.") return result.returncode #args = [] #print("Please note: test impact analysis sequences will be run in read-only mode (seed sequences are unaffected).") #if sequence_type == SequenceType.REGULAR: # print("Sequence type: regular.") # args.append("--sequence=regular") # args.append("--fpolicy=abort") #elif sequence_type == SequenceType.SEED: # print("Sequence type: seed.") # args.append("--sequence=seed") # args.append("--fpolicy=continue") #elif sequence_type == SequenceType.TEST_IMPACT_ANALYSIS: # print("Sequence type: test impact analysis (no write).") # args.append("--fpolicy=abort") # if self.__has_change_list: # args.append(f"-changelist={self.__change_list_path}") # args.append("--sequence=tianowrite") # else: # print(f"No change list was generated, falling back to a regular sequence.") # print("Sequence type: Regular.") # args.append("--sequence=regular") #else: # raise ValueError(sequence_type) # #if test_timeout != None: # args.append(f"--ttimeout={test_timeout}") # print(f"Test target timeout is set to {test_timeout} seconds.") #if global_timeout != None: # args.append(f"--gtimeout={global_timeout}") # print(f"Global sequence timeout is set to {test_timeout} seconds.") # #print("Args: ", end='') #print(*args) #result = subprocess.run([self.__tiaf_bin] + args) #if result.returncode == 0: # print("Test impact analysis runtime returned successfully.") # if sequence_type == SequenceType.SEED: # print("Writing historical meta-data...") # self.__write_last_run_hash(self.__dst_commit) # print("Complete!") #else: # print(f"The test impact analysis runtime returned with error: '{result.returncode}'.") #return result.returncode