diff --git a/scripts/build/bootstrap/incremental_build_util.py b/scripts/build/bootstrap/incremental_build_util.py index 32a6b0f526..296fbb5424 100755 --- a/scripts/build/bootstrap/incremental_build_util.py +++ b/scripts/build/bootstrap/incremental_build_util.py @@ -4,18 +4,18 @@ # import argparse -import ast import boto3 import datetime import urllib.request, urllib.error, urllib.parse import os import psutil import time -import requests import subprocess import sys import tempfile -import traceback +from contextlib import contextmanager +import threading +import _thread DEFAULT_REGION = 'us-west-2' DEFAULT_DISK_SIZE = 300 @@ -42,14 +42,18 @@ if os.name == 'nt': kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) kernel32.GetDiskFreeSpaceExW.argtypes = (ctypes.c_wchar_p,) + (PULARGE_INTEGER,) * 3 + class UsageTuple(collections.namedtuple('UsageTuple', 'total, used, free')): def __str__(self): # Add thousands separator to numbers displayed return self.__class__.__name__ + '(total={:n}, used={:n}, free={:n})'.format(*self) + def is_dir_symlink(path): FILE_ATTRIBUTE_REPARSE_POINT = 0x0400 - return os.path.isdir(path) and (ctypes.windll.kernel32.GetFileAttributesW(str(path)) & FILE_ATTRIBUTE_REPARSE_POINT) + return os.path.isdir(path) and ( + ctypes.windll.kernel32.GetFileAttributesW(str(path)) & FILE_ATTRIBUTE_REPARSE_POINT) + def get_free_space_mb(path): if sys.version_info < (3,): # Python 2? @@ -77,16 +81,39 @@ if os.name == 'nt': used = total.value - free.value - return free.value / 1024 / 1024#for now + return free.value / 1024 / 1024 # for now else: def get_free_space_mb(dirname): st = os.statvfs(dirname) return st.f_bavail * st.f_frsize / 1024 / 1024 + def error(message): print(message) exit(1) + +@contextmanager +def timeout(duration, timeout_message): + timer = threading.Timer(duration, lambda: _thread.interrupt_main()) + timer.start() + try: + yield + except KeyboardInterrupt: + print(timeout_message) + raise TimeoutError + finally: + # If the action ends in specified time, timer is canceled + timer.cancel() + + +def print_drives(): + if os.name == 'nt': + drives_before = win32api.GetLogicalDriveStrings() + drives_before = drives_before.split('\000')[:-1] + print(drives_before) + + def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('-a', '--action', dest="action", help="Action (mount|unmount|delete)") @@ -96,8 +123,10 @@ def parse_args(): parser.add_argument('-b', '--branch', dest="branch", help="Branch") parser.add_argument('-plat', '--platform', dest="platform", help="Platform") parser.add_argument('-c', '--build_type', dest="build_type", help="Build type") - parser.add_argument('-ds', '--disk_size', dest="disk_size", help="Disk size in Gigabytes (defaults to {})".format(DEFAULT_DISK_SIZE), default=DEFAULT_DISK_SIZE) - parser.add_argument('-dt', '--disk_type', dest="disk_type", help="Disk type (defaults to {})".format(DEFAULT_DISK_TYPE), default=DEFAULT_DISK_TYPE) + parser.add_argument('-ds', '--disk_size', dest="disk_size", + help=f"Disk size in Gigabytes (defaults to {DEFAULT_DISK_SIZE})", default=DEFAULT_DISK_SIZE) + parser.add_argument('-dt', '--disk_type', dest="disk_type", help=f"Disk type (defaults to {DEFAULT_DISK_TYPE})", + default=DEFAULT_DISK_TYPE) args = parser.parse_args() # Input validation @@ -117,19 +146,30 @@ def parse_args(): error('No platform specified') if args.build_type is None: error('No build_type specified') - + return args + def get_mount_name(repository_name, project, pipeline, branch, platform, build_type): - mount_name = "{}_{}_{}_{}_{}_{}".format(repository_name, project, pipeline, branch, platform, build_type) - mount_name = mount_name.replace('/','_').replace('\\','_') + mount_name = f"{repository_name}_{project}_{pipeline}_{branch}_{platform}_{build_type}" + mount_name = mount_name.replace('/', '_').replace('\\', '_') return mount_name + def get_pipeline_and_branch(pipeline, branch): - pipeline_and_branch = "{}_{}".format(pipeline, branch) - pipeline_and_branch = pipeline_and_branch.replace('/','_').replace('\\','_') + pipeline_and_branch = f"{pipeline}_{branch}" + pipeline_and_branch = pipeline_and_branch.replace('/', '_').replace('\\', '_') return pipeline_and_branch + +def get_region_name(): + session = boto3.session.Session() + region = session.region_name + if region is None: + region = DEFAULT_REGION + return region + + def get_ec2_client(region): client = boto3.client('ec2', region_name=region) return client @@ -140,48 +180,51 @@ def get_ec2_instance_id(): instance_id = urllib.request.urlopen('http://169.254.169.254/latest/meta-data/instance-id').read() return instance_id.decode("utf-8") except Exception as e: - print(e.message) + print(e) error('No EC2 metadata! Check if you are running this script on an EC2 instance.') def get_availability_zone(): try: - availability_zone = urllib.request.urlopen('http://169.254.169.254/latest/meta-data/placement/availability-zone').read() + availability_zone = urllib.request.urlopen( + 'http://169.254.169.254/latest/meta-data/placement/availability-zone').read() return availability_zone.decode("utf-8") except Exception as e: - print(e.message) + print(e) error('No EC2 metadata! Check if you are running this script on an EC2 instance.') def kill_processes(workspace='/dev/'): - ''' + """ Kills all processes that have open file paths associated with the workspace. Uses PSUtil for cross-platform compatibility - ''' + """ print('Checking for any stuck processes...') for proc in psutil.process_iter(): try: if workspace in str(proc.open_files()): - print("{} has open files in {}. Terminating".format(proc.name(), proc.open_files())) + print(f"{proc.name()} has open files in {proc.open_files()}. Terminating") proc.kill() - time.sleep(1) # Just to make sure a parent process has time to close + time.sleep(1) # Just to make sure a parent process has time to close except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue def delete_volume(ec2_client, volume_id): response = ec2_client.delete_volume(VolumeId=volume_id) - print('Volume {} deleted'.format(volume_id)) + print(f'Volume {volume_id} deleted') + def find_snapshot_id(ec2_client, repository_name, project, pipeline, platform, build_type, disk_size): - mount_name = get_mount_name(repository_name, project, pipeline, 'stabilization_2106', platform, build_type) # we take snapshots out of stabilization_2106 - response = ec2_client.describe_snapshots(Filters= [{ + mount_name = get_mount_name(repository_name, project, pipeline, 'stabilization_2106', platform, + build_type) # we take snapshots out of stabilization_2106 + response = ec2_client.describe_snapshots(Filters=[{ 'Name': 'tag:Name', 'Values': [mount_name] }]) snapshot_id = None if 'Snapshots' in response and len(response['Snapshots']) > 0: - snapshot_start_time_max = None # find the latest snapshot + snapshot_start_time_max = None # find the latest snapshot for snapshot in response['Snapshots']: if snapshot['State'] == 'completed' and snapshot['VolumeSize'] == disk_size: snapshot_start_time = snapshot['StartTime'] @@ -190,28 +233,33 @@ def find_snapshot_id(ec2_client, repository_name, project, pipeline, platform, b snapshot_id = snapshot['SnapshotId'] return snapshot_id -def create_volume(ec2_client, availability_zone, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type): - # The actual EBS default calculation for IOps is a floating point number, the closest approxmiation is 4x of the disk size for simplicity + +def create_volume(ec2_client, availability_zone, repository_name, project, pipeline, branch, platform, build_type, + disk_size, disk_type): mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type) - pipeline_and_branch = get_pipeline_and_branch(pipeline, branch) + pipeline_and_branch = get_pipeline_and_branch(pipeline, branch) parameters = dict( - AvailabilityZone = availability_zone, + AvailabilityZone=availability_zone, VolumeType=disk_type, - TagSpecifications= [{ + TagSpecifications=[{ 'ResourceType': 'volume', 'Tags': [ - { 'Key': 'Name', 'Value': mount_name }, - { 'Key': 'RepositoryName', 'Value': repository_name}, - { 'Key': 'Project', 'Value': project }, - { 'Key': 'Pipeline', 'Value': pipeline }, - { 'Key': 'BranchName', 'Value': branch }, - { 'Key': 'Platform', 'Value': platform }, - { 'Key': 'BuildType', 'Value': build_type }, - { 'Key': 'PipelineAndBranch', 'Value': pipeline_and_branch }, # used so the snapshoting easily identifies which volumes to snapshot + {'Key': 'Name', 'Value': mount_name}, + {'Key': 'RepositoryName', 'Value': repository_name}, + {'Key': 'Project', 'Value': project}, + {'Key': 'Pipeline', 'Value': pipeline}, + {'Key': 'BranchName', 'Value': branch}, + {'Key': 'Platform', 'Value': platform}, + {'Key': 'BuildType', 'Value': build_type}, + # Used so the snapshoting easily identifies which volumes to snapshot + {'Key': 'PipelineAndBranch', 'Value': pipeline_and_branch}, + ] }] ) - if 'io1' in disk_type.lower(): + # The actual EBS default calculation for IOps is a floating point number, + # the closest approxmiation is 4x of the disk size for simplicity + if 'io1' in disk_type.lower(): parameters['Iops'] = (4 * disk_size) snapshot_id = find_snapshot_id(ec2_client, repository_name, project, pipeline, platform, build_type, disk_size) @@ -230,16 +278,17 @@ def create_volume(ec2_client, availability_zone, repository_name, project, pipel time.sleep(1) response = ec2_client.describe_volumes(VolumeIds=[volume_id, ]) - while (response['Volumes'][0]['State'] != 'available'): - time.sleep(1) - response = ec2_client.describe_volumes(VolumeIds=[volume_id, ]) + with timeout(DEFAULT_TIMEOUT, 'ERROR: Timeout reached trying to create EBS.'): + while response['Volumes'][0]['State'] != 'available': + time.sleep(1) + response = ec2_client.describe_volumes(VolumeIds=[volume_id, ]) - print(("Volume {} created\n\tSnapshot: {}\n\tRepository {}\n\tProject {}\n\tPipeline {}\n\tBranch {}\n\tPlatform: {}\n\tBuild type: {}" - .format(volume_id, snapshot_id, repository_name, project, pipeline, branch, platform, build_type))) + print(f"Volume {volume_id} created\n\tSnapshot: {snapshot_id}\n\tRepository {repository_name}\n\t" + f"Project {project}\n\tPipeline {pipeline}\n\tBranch {branch}\n\tPlatform: {platform}\n\tBuild type: {build_type}") return volume_id, created -def mount_volume(created): +def mount_volume_to_device(created): print('Mounting volume...') if os.name == 'nt': f = tempfile.NamedTemporaryFile(delete=False) @@ -247,7 +296,7 @@ def mount_volume(created): select disk 1 online disk attribute disk clear readonly - """.encode('utf-8')) # assume disk # for now + """.encode('utf-8')) # assume disk # for now if created: print('Creating filesystem on new volume') @@ -259,18 +308,12 @@ def mount_volume(created): """.encode('utf-8')) f.close() - + subprocess.call(['diskpart', '/s', f.name]) time.sleep(5) - drives_after = win32api.GetLogicalDriveStrings() - drives_after = drives_after.split('\000')[:-1] - - print(drives_after) - - #drive_letter = next(item for item in drives_after if item not in drives_before) - drive_letter = MOUNT_PATH + print_drives() os.unlink(f.name) @@ -283,8 +326,8 @@ def mount_volume(created): subprocess.call(['mount', '/dev/xvdf', MOUNT_PATH]) -def attach_volume(volume, volume_id, instance_id, timeout=DEFAULT_TIMEOUT): - print('Attaching volume {} to instance {}'.format(volume_id, instance_id)) +def attach_volume_to_ec2_instance(volume, volume_id, instance_id, timeout_duration=DEFAULT_TIMEOUT): + print(f'Attaching volume {volume_id} to instance {instance_id}') volume.attach_to_instance(Device='xvdf', InstanceId=instance_id, VolumeId=volume_id) @@ -292,13 +335,10 @@ def attach_volume(volume, volume_id, instance_id, timeout=DEFAULT_TIMEOUT): time.sleep(2) # reload the volume just in case volume.load() - timeout_init = time.clock() - while (len(volume.attachments) and volume.attachments[0]['State'] != 'attached'): - time.sleep(1) - volume.load() - if (time.clock() - timeout_init) > timeout: - print('ERROR: Timeout reached trying to mount EBS') - exit(1) + with timeout(timeout_duration, 'ERROR: Timeout reached trying to mount EBS.'): + while len(volume.attachments) and volume.attachments[0]['State'] != 'attached': + time.sleep(1) + volume.load() volume.create_tags( Tags=[ { @@ -307,11 +347,11 @@ def attach_volume(volume, volume_id, instance_id, timeout=DEFAULT_TIMEOUT): }, ] ) - print('Volume {} has been attached to instance {}'.format(volume_id, instance_id)) + print(f'Volume {volume_id} has been attached to instance {instance_id}') -def unmount_volume(): - print('Umounting volume...') +def unmount_volume_from_device(): + print('Unmounting EBS volume from device...') if os.name == 'nt': kill_processes(MOUNT_PATH + 'workspace') f = tempfile.NamedTemporaryFile(delete=False) @@ -327,50 +367,31 @@ def unmount_volume(): subprocess.call(['umount', '-f', MOUNT_PATH]) -def detach_volume(volume, ec2_instance_id, force, timeout=DEFAULT_TIMEOUT): - print('Detaching volume {} from instance {}'.format(volume.volume_id, ec2_instance_id)) +def detach_volume_from_ec2_instance(volume, ec2_instance_id, force, timeout_duration=DEFAULT_TIMEOUT): + print(f'Detaching volume {volume.volume_id} from instance {ec2_instance_id}') volume.detach_from_instance(Device='xvdf', Force=force, InstanceId=ec2_instance_id, VolumeId=volume.volume_id) - timeout_init = time.clock() - while len(volume.attachments) and volume.attachments[0]['State'] != 'detached': - time.sleep(1) - volume.load() - if (time.clock() - timeout_init) > timeout: - print('ERROR: Timeout reached trying to unmount EBS.') - volume.detach_from_instance(Device='xvdf',Force=True,InstanceId=ec2_instance_id,VolumeId=volume.volume_id) - exit(1) - - print('Volume {} has been detached from instance {}'.format(volume.volume_id, ec2_instance_id)) + try: + with timeout(timeout_duration, 'ERROR: Timeout reached trying to unmount EBS.'): + while len(volume.attachments) and volume.attachments[0]['State'] != 'detached': + time.sleep(1) + volume.load() + except TimeoutError: + print('Force detaching EBS.') + volume.detach_from_instance(Device='xvdf', Force=True, InstanceId=ec2_instance_id, VolumeId=volume.volume_id) + + print(f'Volume {volume.volume_id} has been detached from instance {ec2_instance_id}') volume.load() if len(volume.attachments): print('Volume still has attachments') for attachment in volume.attachments: - print('Volume {} {} to instance {}'.format(attachment['VolumeId'], attachment['State'], attachment['InstanceId'])) - - -def attach_ebs_and_create_partition_with_retry(volume, volume_id, ec2_instance_id, created): - attach_volume(volume, volume_id, ec2_instance_id) - mount_volume(created) - attempt = 1 - while attempt <= MAX_EBS_MOUNTING_ATTEMPT: - if os.name == 'nt': - drives_after = win32api.GetLogicalDriveStrings() - drives_after = drives_after.split('\000')[:-1] - if MOUNT_PATH not in drives_after: - print('Disk partitioning failed, retrying...') - unmount_volume() - detach_volume(volume, ec2_instance_id, False) - attach_volume(volume, volume_id, ec2_instance_id) - mount_volume(created) - attempt += 1 + print(f"Volume {attachment['VolumeId']} {attachment['State']} to instance {attachment['InstanceId']}") + def mount_ebs(repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type): - session = boto3.session.Session() - region = session.region_name - if region is None: - region = DEFAULT_REGION + region = get_region_name() ec2_client = get_ec2_client(region) ec2_instance_id = get_ec2_instance_id() ec2_availability_zone = get_availability_zone() @@ -379,70 +400,70 @@ def mount_ebs(repository_name, project, pipeline, branch, platform, build_type, for volume in ec2_instance.volumes.all(): for attachment in volume.attachments: - print('attachment device: {}'.format(attachment['Device'])) + print(f"attachment device: {attachment['Device']}") if 'xvdf' in attachment['Device'] and attachment['State'] != 'detached': - print('A device is already attached to xvdf. This likely means a previous build failed to detach its ' \ + print('A device is already attached to xvdf. This likely means a previous build failed to detach its ' 'build volume. This volume is considered orphaned and will be detached from this instance.') - unmount_volume() - detach_volume(volume, ec2_instance_id, False) # Force unmounts should not be used, as that will cause the EBS block device driver to fail the remount + unmount_volume_from_device() + detach_volume_from_ec2_instance(volume, ec2_instance_id, + False) # Force unmounts should not be used, as that will cause the EBS block device driver to fail the remount mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type) response = ec2_client.describe_volumes(Filters=[{ 'Name': 'tag:Name', 'Values': [mount_name] - }]) + }]) created = False if 'Volumes' in response and not len(response['Volumes']): - print('Volume for {} doesn\'t exist creating it...'.format(mount_name)) + print(f'Volume for {mount_name} doesn\'t exist creating it...') # volume doesn't exist, create it - volume_id, created = create_volume(ec2_client, ec2_availability_zone, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type) + volume_id, created = create_volume(ec2_client, ec2_availability_zone, repository_name, project, pipeline, + branch, platform, build_type, disk_size, disk_type) else: volume = response['Volumes'][0] volume_id = volume['VolumeId'] - print('Current volume {} is a {} GB {}'.format(volume_id, volume['Size'], volume['VolumeType'])) - if (volume['Size'] != disk_size or volume['VolumeType'] != disk_type): - print('Override disk attributes does not match the existing volume, deleting {} and replacing the volume'.format(volume_id)) + print(f"Current volume {volume_id} is a {volume['Size']} GB {volume['VolumeType']}") + if volume['Size'] != disk_size or volume['VolumeType'] != disk_type: + print( + f'Override disk attributes does not match the existing volume, deleting {volume_id} and replacing the volume') delete_volume(ec2_client, volume_id) - volume_id, created = create_volume(ec2_client, ec2_availability_zone, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type) + volume_id, created = create_volume(ec2_client, ec2_availability_zone, repository_name, project, pipeline, + branch, platform, build_type, disk_size, disk_type) if len(volume['Attachments']): # this is bad we shouldn't be attached, we should have detached at the end of a build attachment = volume['Attachments'][0] - print(('Volume already has attachment {}, detaching...'.format(attachment))) - detach_volume(ec2_resource.Volume(volume_id), attachment['InstanceId'], True) + print(f'Volume already has attachment {attachment}, detaching...') + detach_volume_from_ec2_instance(ec2_resource.Volume(volume_id), attachment['InstanceId'], True) volume = ec2_resource.Volume(volume_id) - if os.name == 'nt': - drives_before = win32api.GetLogicalDriveStrings() - drives_before = drives_before.split('\000')[:-1] - - print(drives_before) - - attach_ebs_and_create_partition_with_retry(volume, volume_id, ec2_instance_id, created) + print_drives() + attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id) + mount_volume_to_device(created) + print_drives() free_space_mb = get_free_space_mb(MOUNT_PATH) - print('Free disk space {}MB'.format(free_space_mb)) - + print(f'Free disk space {free_space_mb}MB') + if free_space_mb < LOW_EBS_DISK_SPACE_LIMIT: - print('Volume is running below EBS free disk space treshhold {}MB. Recreating volume and running clean build.'.format(LOW_EBS_DISK_SPACE_LIMIT)) - unmount_volume() - detach_volume(volume, ec2_instance_id, False) + print(f'Volume is running below EBS free disk space treshhold {LOW_EBS_DISK_SPACE_LIMIT}MB. Recreating volume and running clean build.') + unmount_volume_from_device() + detach_volume_from_ec2_instance(volume, ec2_instance_id, False) delete_volume(ec2_client, volume_id) new_disk_size = int(volume.size * 1.25) if new_disk_size > MAX_EBS_DISK_SIZE: - print('Error: EBS disk size reached to the allowed maximum disk size {}MB, please contact ly-infra@ and ly-build@ to investigate.'.format(MAX_EBS_DISK_SIZE)) + print(f'Error: EBS disk size reached to the allowed maximum disk size {MAX_EBS_DISK_SIZE}MB, please contact ly-infra@ and ly-build@ to investigate.') exit(1) - print('Recreating the EBS with disk size {}'.format(new_disk_size)) - volume_id, created = create_volume(ec2_client, ec2_availability_zone, repository_name, project, pipeline, branch, platform, build_type, new_disk_size, disk_type) + print(f'Recreating the EBS with disk size {new_disk_size}') + volume_id, created = create_volume(ec2_client, ec2_availability_zone, repository_name, project, pipeline, + branch, platform, build_type, new_disk_size, disk_type) volume = ec2_resource.Volume(volume_id) - attach_ebs_and_create_partition_with_retry(volume, volume_id, ec2_instance_id, created) + attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id) + mount_volume_to_device(created) + def unmount_ebs(): - session = boto3.session.Session() - region = session.region_name - if region is None: - region = DEFAULT_REGION - ec2_client = get_ec2_client(region) + region = get_region_name() ec2_instance_id = get_ec2_instance_id() ec2_resource = boto3.resource('ec2', region_name=region) ec2_instance = ec2_resource.Instance(ec2_instance_id) @@ -454,7 +475,7 @@ def unmount_ebs(): for attached_volume in ec2_instance.volumes.all(): for attachment in attached_volume.attachments: - print('attachment device: {}'.format(attachment['Device'])) + print(f"attachment device: {attachment['Device']}") if attachment['Device'] == 'xvdf': volume = attached_volume @@ -462,24 +483,18 @@ def unmount_ebs(): # volume is not mounted print('Volume is not mounted') else: - unmount_volume() - detach_volume(volume, ec2_instance_id, False) + unmount_volume_from_device() + detach_volume_from_ec2_instance(volume, ec2_instance_id, False) + def delete_ebs(repository_name, project, pipeline, branch, platform, build_type): unmount_ebs() - - session = boto3.session.Session() - region = session.region_name - if region is None: - region = DEFAULT_REGION + region = get_region_name() ec2_client = get_ec2_client(region) - ec2_instance_id = get_ec2_instance_id() - ec2_resource = boto3.resource('ec2', region_name=region) - ec2_instance = ec2_resource.Instance(ec2_instance_id) mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type) response = ec2_client.describe_volumes(Filters=[ - { 'Name': 'tag:Name', 'Values': [mount_name] } + {'Name': 'tag:Name', 'Values': [mount_name]} ]) if 'Volumes' in response and len(response['Volumes']): @@ -496,7 +511,8 @@ def main(action, repository_name, project, pipeline, branch, platform, build_typ elif action == 'delete': delete_ebs(repository_name, project, pipeline, branch, platform, build_type) + if __name__ == "__main__": args = parse_args() - ret = main(args.action, args.repository_name, args.project, args.pipeline, args.branch, args.platform, args.build_type, args.disk_size, args.disk_type) - sys.exit(ret) \ No newline at end of file + main(args.action, args.repository_name, args.project, args.pipeline, args.branch, args.platform, + args.build_type, args.disk_size, args.disk_type)