Source code for cromwell_manager.calledtask
from .resource_utilization import ResourceUtilization
from .io_util import GSObject
[docs]class Shard:
"""at the moment, shard is a simple named dictionary class containing shard information"""
def __init__(self, metadata, client):
"""
:param dict metadata: shard metadata
"""
self._data = metadata
gs_log = GSObject(self._data['monitoringLog'], client)
try:
fileobj = gs_log.download_to_bytes_readable()
self._resource_utilization = ResourceUtilization.from_file(
task_name=self._data['labels']['wdl-task-name'],
open_log_file_object=fileobj)
except AttributeError: # monitoringLog does not exist for this task
self._resource_utilization = None
def __repr__(self):
return '<Google Compute Shard: %s>' % self._data['labels']['wdl-task-name']
def __getitem__(self, item):
return self._data[item]
def __setitem__(self, key, value):
self._data[key] = value
def __len__(self):
return len(self._data)
@property
def resource_utilization(self):
return self._resource_utilization
[docs]class CalledTask:
"""Object to define an instance of a called workflow task."""
def __init__(self, name, shard_metadata, client):
"""
:param str name: name of task
:param list shard_metadata: json dictionary of metadata for this task
:param google.cloud.storage.Client client: Authenticate google storage client
"""
if not isinstance(shard_metadata, list):
raise TypeError('shard_metadata must be a list, not %s' % type(shard_metadata))
self._name = name
self._storage_client = client
self._shards = [Shard(s, client) for s in shard_metadata]
def __repr__(self):
return "<CalledTask: %s, %d shard(s)>" % (self._name, len(self._shards))
@property
def is_singleton(self):
return True if len(self._shards) == 1 else False
@property
def is_scattered(self):
return not self.is_singleton
@property
def name(self):
return self._name
@property
def resource_utilization(self):
if self.is_singleton:
return self._shards[0].resource_utilization
else:
first = self._shards[0].resource_utilization
for next_shard in self._shards[1:]:
first = ResourceUtilization.merge(first, next_shard.resource_utilization)
return first