You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
o3de/scripts/build/bootstrap/incremental_build_util.py

546 lines
21 KiB
Python

# Copyright (c) Contributors to the Open 3D Engine Project.
# For complete copyright and license terms please see the LICENSE at the root of this distribution.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT
#
import argparse
import boto3
import datetime
import urllib.request, urllib.error, urllib.parse
import os
import psutil
import time
import subprocess
import sys
import tempfile
from contextlib import contextmanager
import threading
import _thread
from botocore.config import Config
DEFAULT_REGION = 'us-west-2'
DEFAULT_DISK_SIZE = 300
DEFAULT_DISK_TYPE = 'gp2'
DEFAULT_TIMEOUT = 300
MAX_EBS_MOUNTING_ATTEMPT = 3
LOW_EBS_DISK_SPACE_LIMIT = 10240
MAX_EBS_DISK_SIZE = DEFAULT_DISK_SIZE * 2
if os.name == 'nt':
MOUNT_PATH = 'D:\\'
else:
MOUNT_PATH = '/data'
if os.name == 'nt':
import ctypes
import win32api
import collections
import locale
locale.setlocale(locale.LC_ALL, '') # set locale to default to get thousands separators
PULARGE_INTEGER = ctypes.POINTER(ctypes.c_ulonglong) # Pointer to large unsigned integer
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
kernel32.GetDiskFreeSpaceExW.argtypes = (ctypes.c_wchar_p,) + (PULARGE_INTEGER,) * 3
class UsageTuple(collections.namedtuple('UsageTuple', 'total, used, free')):
def __str__(self):
# Add thousands separator to numbers displayed
return self.__class__.__name__ + '(total={:n}, used={:n}, free={:n})'.format(*self)
def is_dir_symlink(path):
FILE_ATTRIBUTE_REPARSE_POINT = 0x0400
return os.path.isdir(path) and (
ctypes.windll.kernel32.GetFileAttributesW(str(path)) & FILE_ATTRIBUTE_REPARSE_POINT)
def get_free_space_mb(path):
if sys.version_info < (3,): # Python 2?
saved_conversion_mode = ctypes.set_conversion_mode('mbcs', 'strict')
else:
try:
path = os.fsdecode(path) # allows str or bytes (or os.PathLike in Python 3.6+)
except AttributeError: # fsdecode() not added until Python 3.2
pass
# Define variables to receive results when passed as "by reference" arguments
_, total, free = ctypes.c_ulonglong(), ctypes.c_ulonglong(), ctypes.c_ulonglong()
success = kernel32.GetDiskFreeSpaceExW(
path, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free))
if not success:
error_code = ctypes.get_last_error()
if sys.version_info < (3,): # Python 2?
ctypes.set_conversion_mode(*saved_conversion_mode) # restore conversion mode
if not success:
windows_error_message = ctypes.FormatError(error_code)
raise ctypes.WinError(error_code, '{} {!r}'.format(windows_error_message, path))
used = total.value - free.value
return free.value / 1024 / 1024 # for now
else:
def get_free_space_mb(dirname):
st = os.statvfs(dirname)
return st.f_bavail * st.f_frsize / 1024 / 1024
def error(message):
print(message)
exit(1)
@contextmanager
def timeout(duration, timeout_message):
timer = threading.Timer(duration, lambda: _thread.interrupt_main())
timer.start()
try:
yield
except KeyboardInterrupt:
print(timeout_message)
raise TimeoutError
finally:
# If the action ends in specified time, timer is canceled
timer.cancel()
def print_drives():
if os.name == 'nt':
drives_before = win32api.GetLogicalDriveStrings()
drives_before = drives_before.split('\000')[:-1]
print(drives_before)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--action', dest="action", help="Action (mount|unmount|delete)")
parser.add_argument('-snapshot-hint', '--snapshot-hint', dest="snapshot_hint", help="Build snapshot to attempt to use")
parser.add_argument('-repository_name', '--repository_name', dest="repository_name", help="Repository name")
parser.add_argument('-project', '--project', dest="project", help="Project")
parser.add_argument('-pipe', '--pipeline', dest="pipeline", help="Pipeline")
parser.add_argument('-b', '--branch', dest="branch", help="Branch")
parser.add_argument('-plat', '--platform', dest="platform", help="Platform")
parser.add_argument('-c', '--build_type', dest="build_type", help="Build type")
parser.add_argument('-ds', '--disk_size', dest="disk_size",
help=f"Disk size in Gigabytes (defaults to {DEFAULT_DISK_SIZE})", default=DEFAULT_DISK_SIZE)
parser.add_argument('-dt', '--disk_type', dest="disk_type", help=f"Disk type (defaults to {DEFAULT_DISK_TYPE})",
default=DEFAULT_DISK_TYPE)
args = parser.parse_args()
# Input validation
if args.action is None:
error('No action specified')
args.action = args.action.lower()
if args.action != 'unmount':
if args.repository_name is None:
error('No repository specified')
if args.project is None:
error('No project specified')
if args.pipeline is None:
error('No pipeline specified')
if args.branch is None:
error('No branch specified')
if args.platform is None:
error('No platform specified')
if args.build_type is None:
error('No build_type specified')
return args
def get_mount_name(repository_name, project, pipeline, branch, platform, build_type):
mount_name = f"{repository_name}_{project}_{pipeline}_{branch}_{platform}_{build_type}"
mount_name = mount_name.replace('/', '_').replace('\\', '_')
return mount_name
def get_pipeline_and_branch(pipeline, branch):
pipeline_and_branch = f"{pipeline}_{branch}"
pipeline_and_branch = pipeline_and_branch.replace('/', '_').replace('\\', '_')
return pipeline_and_branch
def get_region_name():
session = boto3.session.Session()
region = session.region_name
if region is None:
region = DEFAULT_REGION
return region
def get_ec2_client(region):
client_config = Config(
region_name=region,
retries={
'mode': 'standard'
}
)
client = boto3.client('ec2', config=client_config)
return client
def get_ec2_resource(region):
resource_config = Config(
region_name=region,
retries={
'mode': 'standard'
}
)
resource = boto3.resource('ec2', config=resource_config)
return resource
def get_ec2_instance_id():
try:
instance_id = urllib.request.urlopen('http://169.254.169.254/latest/meta-data/instance-id').read()
return instance_id.decode("utf-8")
except Exception as e:
print(e)
error('No EC2 metadata! Check if you are running this script on an EC2 instance.')
def get_availability_zone():
try:
availability_zone = urllib.request.urlopen(
'http://169.254.169.254/latest/meta-data/placement/availability-zone').read()
return availability_zone.decode("utf-8")
except Exception as e:
print(e)
error('No EC2 metadata! Check if you are running this script on an EC2 instance.')
def kill_processes(workspace='/dev/'):
"""
Kills all processes that have open file paths associated with the workspace.
Uses PSUtil for cross-platform compatibility
"""
print('Checking for any stuck processes...')
for proc in psutil.process_iter():
try:
if workspace in str(proc.open_files()):
print(f"{proc.name()} has open files in {proc.open_files()}. Terminating")
proc.kill()
time.sleep(1) # Just to make sure a parent process has time to close
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
def delete_volume(ec2_client, volume_id):
response = ec2_client.delete_volume(VolumeId=volume_id)
print(f'Volume {volume_id} deleted')
def find_snapshot_id(ec2_client, snapshot_hint, repository_name, project, pipeline, platform, build_type, disk_size):
mount_name = get_mount_name(repository_name, project, pipeline, snapshot_hint, platform, build_type)
response = ec2_client.describe_snapshots(Filters= [{
'Name': 'tag:Name', 'Values': [mount_name]
}])
snapshot_id = None
if 'Snapshots' in response and len(response['Snapshots']) > 0:
snapshot_start_time_max = None # find the latest snapshot
for snapshot in response['Snapshots']:
if snapshot['State'] == 'completed' and snapshot['VolumeSize'] == disk_size:
snapshot_start_time = snapshot['StartTime']
if not snapshot_start_time_max or snapshot_start_time > snapshot_start_time_max:
snapshot_start_time_max = snapshot_start_time
snapshot_id = snapshot['SnapshotId']
return snapshot_id
def offline_drive(disk_number=1):
"""Use diskpart to offline a Windows drive"""
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(f"""
select disk {disk_number}
offline disk
""".encode('utf-8'))
subprocess.run(['diskpart', '/s', f.name])
os.unlink(f.name)
def create_volume(ec2_client, availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
# The actual EBS default calculation for IOps is a floating point number, the closest approxmiation is 4x of the disk size for simplicity
mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
pipeline_and_branch = get_pipeline_and_branch(pipeline, branch)
parameters = dict(
AvailabilityZone=availability_zone,
VolumeType=disk_type,
Encrypted=True,
TagSpecifications= [{
'ResourceType': 'volume',
'Tags': [
{'Key': 'Name', 'Value': mount_name},
{'Key': 'RepositoryName', 'Value': repository_name},
{'Key': 'Project', 'Value': project},
{'Key': 'Pipeline', 'Value': pipeline},
{'Key': 'BranchName', 'Value': branch},
{'Key': 'Platform', 'Value': platform},
{'Key': 'BuildType', 'Value': build_type},
# Used so the snapshoting easily identifies which volumes to snapshot
{'Key': 'PipelineAndBranch', 'Value': pipeline_and_branch},
]
}]
)
# The actual EBS default calculation for IOps is a floating point number,
# the closest approxmiation is 4x of the disk size for simplicity
if 'io1' in disk_type.lower():
parameters['Iops'] = (4 * disk_size)
snapshot_id = find_snapshot_id(ec2_client, snapshot_hint, repository_name, project, pipeline, platform, build_type, disk_size)
if snapshot_id:
parameters['SnapshotId'] = snapshot_id
created = False
else:
# If no snapshot id, we need to specify the size
parameters['Size'] = disk_size
created = True
response = ec2_client.create_volume(**parameters)
volume_id = response['VolumeId']
# give some time for the creation call to complete
time.sleep(1)
response = ec2_client.describe_volumes(VolumeIds=[volume_id, ])
with timeout(DEFAULT_TIMEOUT, 'ERROR: Timeout reached trying to create EBS.'):
while response['Volumes'][0]['State'] != 'available':
time.sleep(1)
response = ec2_client.describe_volumes(VolumeIds=[volume_id, ])
print(f"Volume {volume_id} created\n\tSnapshot: {snapshot_id}\n\tRepository {repository_name}\n\t"
f"Project {project}\n\tPipeline {pipeline}\n\tBranch {branch}\n\tPlatform: {platform}\n\tBuild type: {build_type}")
return volume_id, created
def mount_volume_to_device(created):
print('Mounting volume...')
if os.name == 'nt':
# Verify drive is in an offline state.
# Some Windows configs will automatically set new drives as online causing diskpart setup script to fail.
offline_drive()
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write("""
select disk 1
online disk
attribute disk clear readonly
""".encode('utf-8')) # assume disk # for now
if created:
print('Creating filesystem on new volume')
f.write("""create partition primary
select partition 1
format quick fs=ntfs
assign
active
""".encode('utf-8'))
subprocess.call(['diskpart', '/s', f.name])
time.sleep(5)
print_drives()
os.unlink(f.name)
time.sleep(1)
else:
device_name = '/dev/xvdf'
nvme_device_name = '/dev/nvme1n1'
if os.path.exists(nvme_device_name):
device_name = nvme_device_name
subprocess.call(['file', '-s', device_name])
if created:
subprocess.call(['mkfs', '-t', 'ext4', device_name])
subprocess.call(['mount', device_name, MOUNT_PATH])
def attach_volume_to_ec2_instance(volume, volume_id, instance_id, timeout_duration=DEFAULT_TIMEOUT):
print(f'Attaching volume {volume_id} to instance {instance_id}')
volume.attach_to_instance(Device='xvdf',
InstanceId=instance_id,
VolumeId=volume_id)
# give a little bit of time for the aws call to process
time.sleep(2)
# reload the volume just in case
volume.load()
with timeout(timeout_duration, 'ERROR: Timeout reached trying to mount EBS.'):
while len(volume.attachments) and volume.attachments[0]['State'] != 'attached':
time.sleep(1)
volume.load()
volume.create_tags(
Tags=[
{
'Key': 'AttachDate',
'Value': str(datetime.datetime.today().date())
},
]
)
print(f'Volume {volume_id} has been attached to instance {instance_id}')
def unmount_volume_from_device():
print('Unmounting EBS volume from device...')
if os.name == 'nt':
kill_processes(MOUNT_PATH + 'workspace')
offline_drive()
else:
kill_processes(MOUNT_PATH)
subprocess.call(['umount', '-f', MOUNT_PATH])
def detach_volume_from_ec2_instance(volume, ec2_instance_id, force, timeout_duration=DEFAULT_TIMEOUT):
print(f'Detaching volume {volume.volume_id} from instance {ec2_instance_id}')
volume.detach_from_instance(Device='xvdf',
Force=force,
InstanceId=ec2_instance_id,
VolumeId=volume.volume_id)
try:
with timeout(timeout_duration, 'ERROR: Timeout reached trying to unmount EBS.'):
while len(volume.attachments) and volume.attachments[0]['State'] != 'detached':
time.sleep(1)
volume.load()
except TimeoutError:
print('Force detaching EBS.')
volume.detach_from_instance(Device='xvdf', Force=True, InstanceId=ec2_instance_id, VolumeId=volume.volume_id)
print(f'Volume {volume.volume_id} has been detached from instance {ec2_instance_id}')
volume.load()
if len(volume.attachments):
print('Volume still has attachments')
for attachment in volume.attachments:
print(f"Volume {attachment['VolumeId']} {attachment['State']} to instance {attachment['InstanceId']}")
def mount_ebs(snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
region = get_region_name()
ec2_client = get_ec2_client(region)
ec2_instance_id = get_ec2_instance_id()
ec2_availability_zone = get_availability_zone()
ec2_resource = get_ec2_resource(region)
ec2_instance = ec2_resource.Instance(ec2_instance_id)
for volume in ec2_instance.volumes.all():
for attachment in volume.attachments:
print(f"attachment device: {attachment['Device']}")
if 'xvdf' in attachment['Device'] and attachment['State'] != 'detached':
print('A device is already attached to xvdf. This likely means a previous build failed to detach its '
'build volume. This volume is considered orphaned and will be detached from this instance.')
unmount_volume_from_device()
detach_volume_from_ec2_instance(volume, ec2_instance_id,
False) # Force unmounts should not be used, as that will cause the EBS block device driver to fail the remount
mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
response = ec2_client.describe_volumes(Filters=[{
'Name': 'tag:Name', 'Values': [mount_name]
}])
created = False
if 'Volumes' in response and not len(response['Volumes']):
print(f'Volume for {mount_name} doesn\'t exist creating it...')
# volume doesn't exist, create it
volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
else:
volume = response['Volumes'][0]
volume_id = volume['VolumeId']
print(f"Current volume {volume_id} is a {volume['Size']} GB {volume['VolumeType']}")
if volume['Size'] != disk_size or volume['VolumeType'] != disk_type:
print(
f'Override disk attributes does not match the existing volume, deleting {volume_id} and replacing the volume')
delete_volume(ec2_client, volume_id)
volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
if len(volume['Attachments']):
# this is bad we shouldn't be attached, we should have detached at the end of a build
attachment = volume['Attachments'][0]
print(f'Volume already has attachment {attachment}, detaching...')
detach_volume_from_ec2_instance(ec2_resource.Volume(volume_id), attachment['InstanceId'], True)
volume = ec2_resource.Volume(volume_id)
print_drives()
attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id)
mount_volume_to_device(created)
print_drives()
free_space_mb = get_free_space_mb(MOUNT_PATH)
print(f'Free disk space {free_space_mb}MB')
if free_space_mb < LOW_EBS_DISK_SPACE_LIMIT:
print(f'Volume is running below EBS free disk space treshhold {LOW_EBS_DISK_SPACE_LIMIT}MB. Recreating volume and running clean build.')
unmount_volume_from_device()
detach_volume_from_ec2_instance(volume, ec2_instance_id, False)
delete_volume(ec2_client, volume_id)
new_disk_size = int(volume.size * 1.25)
if new_disk_size > MAX_EBS_DISK_SIZE:
print(f'Error: EBS disk size reached to the allowed maximum disk size {MAX_EBS_DISK_SIZE}MB, please contact ly-infra@ and ly-build@ to investigate.')
exit(1)
print('Recreating the EBS with disk size {}'.format(new_disk_size))
volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, new_disk_size, disk_type)
volume = ec2_resource.Volume(volume_id)
attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id)
mount_volume_to_device(created)
def unmount_ebs():
region = get_region_name()
ec2_instance_id = get_ec2_instance_id()
ec2_resource = get_ec2_resource(region)
ec2_instance = ec2_resource.Instance(ec2_instance_id)
if os.path.isfile('envinject.properties'):
os.remove('envinject.properties')
volume = None
for attached_volume in ec2_instance.volumes.all():
for attachment in attached_volume.attachments:
print(f"attachment device: {attachment['Device']}")
if attachment['Device'] == 'xvdf':
volume = attached_volume
if not volume:
# volume is not mounted
print('Volume is not mounted')
else:
unmount_volume_from_device()
detach_volume_from_ec2_instance(volume, ec2_instance_id, False)
def delete_ebs(repository_name, project, pipeline, branch, platform, build_type):
unmount_ebs()
region = get_region_name()
ec2_client = get_ec2_client(region)
mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
response = ec2_client.describe_volumes(Filters=[
{'Name': 'tag:Name', 'Values': [mount_name]}
])
if 'Volumes' in response and len(response['Volumes']):
volume = response['Volumes'][0]
volume_id = volume['VolumeId']
delete_volume(ec2_client, volume_id)
def main(action, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
if action == 'mount':
mount_ebs(snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
elif action == 'unmount':
unmount_ebs()
elif action == 'delete':
delete_ebs(repository_name, project, pipeline, branch, platform, build_type)
if __name__ == "__main__":
args = parse_args()
ret = main(args.action, args.snapshot_hint, args.repository_name, args.project, args.pipeline, args.branch, args.platform, args.build_type, args.disk_size, args.disk_type)
sys.exit(ret)