You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
o3de/AutomatedTesting/Gem/PythonTests/automatedtesting_shared/asset_database_utils.py

195 lines
7.3 KiB
Python

"""
All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or
its licensors.
For complete copyright and license terms please see the LICENSE at the root of this
distribution (the "License"). All use of this software is governed by the License,
or, if provided, by the license below or the license accompanying this file. Do not
remove or modify any license notices. This file is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
"""
import binascii
from dataclasses import dataclass, field
import logging
import sqlite3
import os
from typing import List
import ly_test_tools.o3de.pipeline_utils as pipeline_utils
# Index for ProductID in Products table in DB
PRODUCT_ID_INDEX = 0
logger = logging.getLogger(__name__)
def do_select(asset_db_path, cmd):
try:
connection = sqlite3.connect(asset_db_path)
# Get ProductID from database
db_rows = connection.execute(cmd)
return_result = db_rows.fetchall()
connection.close()
return return_result
except sqlite3.Error as sqlite_error:
print(f'select on db {asset_db_path} failed with exception {sqlite_error}')
return []
def get_active_platforms_from_db(asset_db_path) -> List[str]:
"""Returns a list of platforms that are active in the database, based on what jobs were run"""
platform_rows = do_select(asset_db_path, f"select distinct Platform from Jobs")
# Condense this into a single list of platforms.
platforms = [platform[0] for platform in platform_rows]
return platforms
# Convert a source product path into a db product path
# cache_platform/projectname/product_path
def get_db_product_path(workspace, source_path, cache_platform):
product_path = os.path.join(cache_platform, workspace.project, source_path)
product_path = product_path.replace('\\', '/')
return product_path
def get_product_id(asset_db_path, product_name) -> str:
# Get ProductID from database
product_id = list(do_select(asset_db_path, f"SELECT ProductID FROM Products where ProductName='{product_name}'"))
if len(product_id) == 0:
return product_id # return empty list
return product_id[0][PRODUCT_ID_INDEX] # Get product id from 'first' row
# Retrieve a product_id given a source_path assuming the source is copied into the cache with the same
# name or a product name without cache_platform or projectname prepended
def get_product_id_from_relative(workspace, source_path, asset_platform):
return get_product_id(workspace.paths.asset_db(), get_db_product_path(workspace, source_path, asset_platform))
def get_missing_dependencies(asset_db_path, product_id) -> List[str]:
return list(do_select(asset_db_path, f"SELECT * FROM MissingProductDependencies where ProductPK={product_id}"))
def do_single_transaction(asset_db_path, cmd):
try:
connection = sqlite3.connect(asset_db_path)
cursor = connection.cursor() # SQL cursor used for issuing commands
cursor.execute(cmd)
connection.commit() # Save changes
connection.close()
except sqlite3.Error as sqlite_error:
print(f'transaction on db {asset_db_path} cmd {cmd} failed with exception {sqlite_error}')
def clear_missing_dependencies(asset_db_path, product_id) -> None:
do_single_transaction(asset_db_path, f"DELETE FROM MissingProductDependencies where ProductPK={product_id}")
def clear_all_missing_dependencies(asset_db_path) -> None:
do_single_transaction(asset_db_path, "DELETE FROM MissingProductDependencies;")
@dataclass
class DBProduct:
product_name: str = None
sub_id: int = 0
asset_type: str = None
@dataclass
class DBJob:
job_key: str = None
builder_guid: str = None
status: int = 0
error_count: int = 0
warning_count: int = 0
platform: str = None
# Key: Product ID
products: List[DBProduct] = field(default_factory=list)
@dataclass
class DBSourceAsset:
source_file_name: str = None
uuid: str = None
scan_folder_key: str = field(compare=False, default=None)
id: str = field(compare=False, default=None)
# Key: Job ID
jobs: List[DBJob] = field(default_factory=list)
def compare_expected_asset_to_actual_asset(asset_db_path, expected_asset: DBSourceAsset, asset_path, cache_root):
actual_asset = get_db_source_job_product_info(asset_db_path, asset_path, cache_root)
assert expected_asset.uuid == actual_asset.uuid, \
f"UUID for asset {expected_asset.source_file_name} is '{actual_asset.uuid}', but expected '{expected_asset.uuid}'"
for expected_job in expected_asset.jobs:
for actual_job in actual_asset.jobs:
found_job = False
if expected_job.job_key == actual_job.job_key and expected_job.platform == actual_job.platform:
found_job = True
assert expected_job == actual_job, \
f"Expected job did not match actual job for key {expected_job.job_key} and platform {expected_job.platform}.\nExpected: {expected_job}\nActual: {actual_job}"
# Remove the found job to speed up other searches.
actual_asset.jobs.remove(actual_job)
break
assert found_job, f"For asset {expected_asset.source_file_name}, could not find job with key '{expected_job.job_key}' and platform '{expected_job.platform}'"
# Don't assert on any actual jobs remaining, they could be for platforms not checked by this test, or new job keys not yet handled.
def get_db_source_job_product_info(asset_db_path, filename, cache_root):
source_db = get_source_info_from_filename(asset_db_path, filename)
source = DBSourceAsset()
source.source_file_name = filename
source.id = source_db[0]
source.scan_folder_key = source_db[1]
source.uuid = binascii.hexlify(source_db[2])
jobs_db = get_jobs_for_source(asset_db_path, source.id)
for job_db in jobs_db:
job_id = job_db[0]
job = DBJob()
job.job_key = job_db[1]
job.platform = job_db[3]
job.builder_guid = binascii.hexlify(job_db[4])
job.status = job_db[5]
job.error_count = job_db[6]
job.warning_count = job_db[7]
products_db = get_products_for_job(asset_db_path, job_id)
for product_db in products_db:
product = DBProduct()
product.product_name = product_db[1]
product.sub_id = product_db[2]
product.asset_type = binascii.hexlify(product_db[3])
job.products.append(product)
source.jobs.append(job)
return source
def get_source_info_from_filename(asset_db_path, filename):
sources = do_select(asset_db_path,
f"SELECT SourceID,ScanFolderPK,SourceGuid FROM Sources where SourceName=\"{filename}\"")
assert len(sources) == 1, f"Zero or multiple source assets found when only one was expected for '{filename}'"
return sources[0]
def get_jobs_for_source(asset_db_path, source_id):
return do_select(asset_db_path,
f"SELECT JobID,JobKey,Fingerprint,Platform,BuilderGuid,Status,ErrorCount,WarningCount FROM Jobs where SourcePK={source_id} order by JobKey")
def get_products_for_job(asset_db_path, job_id):
return do_select(asset_db_path,
f"SELECT ProductID,ProductName,SubID,AssetType FROM Products where JobPK={job_id} order by ProductName")