Source code for cromwell_manager.resource_utilization



[docs]class ResourceUtilization: """Class to store resource utilization information for a task, run on Cromwell.""" def __init__(self, task_name, max_memory, total_memory, max_disk, total_disk, robust): """ :param int task_name: :param int max_memory: :param int total_memory: :param int max_disk: :param int total_disk: :param bool robust: """ self.task_name = task_name self.max_memory = max_memory self.total_memory = total_memory self.max_disk = max_disk self.total_disk = total_disk self.robust = robust self.fraction_disk_used = max_disk / total_disk self.fraction_memory_used = max_memory / total_memory @classmethod
[docs] def from_file(cls, task_name, open_log_file_object): """Create a ResourceUtilization object from a monitoring log file. :param str task_name: Name of this task :param file open_log_file_object: an open monitoring log from cromwell :return ResourceUtilization: memory and disk utilization for this task """ # parse the file total_memory, total_disk, max_memory, max_disk = 0, 0, 0, 0 i = 0 for i, line in enumerate(open_log_file_object): line = line.lower() # make a bit more robust to capitalization in script if line.startswith(b'total memory (mb):'): total_memory = int(line.split()[-1]) elif line.startswith(b'total disk space (kb):'): total_disk = int(line.split()[-1]) elif line.startswith(b'* memory usage (mb):'): max_memory = max(max_memory, int(line.split()[-1])) elif line.startswith(b'* disk usage (kb):'): max_disk = max(max_disk, int(line.split()[-1])) robust = True if i >= 5 else False return cls(task_name, max_memory, total_memory, max_disk, total_disk, robust)
def __str__(self): return ( "{task_name} Monitoring Summary:\n" "Max Memory Usage (MB): {max_memory!s}\n" "Available Memory (MB): {total_memory!s}\n" "Max disk usage (KB): {max_disk!s}\n" "Available disk (KB): {total_disk}\n" "Disk Utilized (%): {disk_utilization:.3f}\n" "Memory Utilized (%): {memory_utilization:.3f}\n" "Robust Estimate? : {robust}\n".format( task_name=self.task_name, max_memory=self.max_memory, total_memory=self.total_memory, max_disk=self.max_disk, total_disk=self.total_disk, robust=self.robust, disk_utilization=self.fraction_disk_used, memory_utilization=self.fraction_memory_used ) ) def __bytes__(self): return self.__str__().encode() @staticmethod
[docs] def merge(x, y=None): """Merge two ResourceUtilization objects for the same task, returning the maximum utilization. :param ResourceUtilization x: :param ResourceUtilization y: :return ResourceUtilization: maximum resource utilization """ if not all(isinstance(v, ResourceUtilization) or v is None for v in (x, y)): raise TypeError( 'Merge is only possible between ResourceUtilization Objects, not %s and' ' %s' % (x, y)) if y is None: return x else: max_memory = max(x.max_memory, y.max_memory) total_memory = max(x.total_memory, y.total_memory) max_disk = max(x.max_disk, y.max_disk) total_disk = max(x.total_disk, y.total_disk) robust = any([x.robust, y.robust]) return ResourceUtilization( x.task_name, max_memory, total_memory, max_disk, total_disk, robust)