import re
import json
import webbrowser
from time import sleep
from collections.abc import Iterable
from itertools import repeat
import requests
from requests.auth import HTTPBasicAuth
[docs]class Cromwell:
"""Wrapper for the Cromwell REST API"""
def __init__(self, cromwell_url, username=None, password=None, api_version='v1'):
"""API wrapper for a running cromwell server
:param str cromwell_url: url of a running cromwell instance
:param str | None username: (optional) username for the cromwell instance
:param str | None password: (optional) password for the cromwell instance
:param str api_version: version of the cromwell API
"""
if isinstance(cromwell_url, str):
self._cromwell_url = cromwell_url
else:
raise TypeError('cromwell_url must be a str, not %s' % type(cromwell_url))
if isinstance(username, str) or username is None:
self.username = username
else:
raise TypeError('If provided, username must be a str, not %s' % type(username))
if isinstance(password, str) or username is None:
self.password = password
else:
raise TypeError('If provided, password must be a str, not %s' % type(password))
if isinstance(api_version, str):
self.api_version = api_version
else:
raise ValueError('version must be a str, not %s' % type(api_version))
self.auth = HTTPBasicAuth(username, password) if username and password else None
self.url_prefix = '{cromwell_url}/api/workflows/{version}'.format(
cromwell_url=self.cromwell_url, version=self.api_version)
# check that server is running
if not self.server_is_running():
raise RuntimeError('url, username, and password did not authenticate to a running '
'cromwell instance.')
def __repr__(self):
return '<Cromwell Server: ip: %s>' % self.cromwell_url
@property
def cromwell_url(self):
"""URL for the cromwell REST endpoints."""
return self._cromwell_url
@cromwell_url.setter
def cromwell_url(self, value):
if not re.match('https?://', value):
raise ValueError('cromwell_url must be an http or https address.')
if not self.server_is_running(value):
raise ValueError('cromwell_url does not match a running cromwell server')
self._cromwell_url = value.rstrip('/') # trailing slash is not accepted by cromwell
@staticmethod
[docs] def print_request(request_type, request_string, response):
"""Print a request to console.
Triggered by the verbose=True flag on cromwell or workspace functions and properties.
:param str request_type: {GET, POST} type of REST operation
:param str request_string: full request url
:param requests.Response response: response from request operation
"""
try:
formatted_response = json.dumps(response.json(), indent=2)
print('{request_type} Request: {request_string}\nResponse: {response}\n'
'Response Content:\n{response_content}'.format(
request_type=request_type, request_string=request_string,
response=response.status_code, response_content=formatted_response))
except json.decoder.JSONDecodeError: # no content obtained
print('{request_type} Request: {request_string}\nResponse: {response}\n'
.format(request_type=request_type, request_string=request_string,
response=response.status_code))
@staticmethod
[docs] def print_failure(response, message=''):
"""Print information on a failing request to console.
:param requests.Response response: response from request operation
:param str message: (optional) message to append to failure report
"""
print(
'Request: {url}\n'
'{message}\n'
'Response Code: {code}\n'
'Reason: {reason}\n'.format(
url=response.url, message=message, code=response.status_code, reason=response.reason
))
[docs] def swagger(self):
"""Open the swagger page for this cromwell server."""
webbrowser.open(self.cromwell_url)
[docs] def wait_for_status(self, status, workflow_id, verbose=False, timeout=15, delay=3):
"""Wait until any status in a list of potentially many statuses is achieved for a workflow.
:param Iterable status: Iterable of one or more statuses to wait for
:param str workflow_id: identifier hash code for a workflow
:param bool verbose: if True, print the requests made
:param int timeout: maximum time to wait
:param int delay: time between status queries
:return requests.Response: response object generated when workflow_id achieves the first
valid status
"""
# raise error if a non-existent status is passed (infinite loop)
status = set(status)
if verbose:
print('Waiting for workflow to achieve {status} status ...'
.format(status=status))
response = None
tries = range(0, timeout, delay) if timeout is not None else repeat(0, times=None)
for _ in tries:
response = self.status(workflow_id)
if response.json()['status'] in status:
return response
sleep(delay)
# workflow didn't start
message = ('Workflow took more than {n!s} seconds to achieve {status}'
''.format(n=timeout, status=status))
self.print_failure(response, message)
return response
[docs] def post(self, url, verbose=False, *args, **kwargs):
"""Make a REST POST query to url.
:param str url: POST query url
:param bool verbose: if True, print the query, response code, and content (default False)
:param args: additional arguments to pass to requests.post
:param kwargs: additional arguments to pass to requests.post
:return requests.Response: requests response object
"""
response = requests.post(url, auth=self.auth, *args, **kwargs)
if verbose:
self.print_request('POST', url, response)
return response
[docs] def get(self, url, verbose=False, open_browser=False, *args, **kwargs):
"""Make a REST GET query to url.
:param str url: GET query url
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:param args: additional positional args to pass to requests.get
:param kwargs: additional keyword args to pass to request.get
:return requests.Response: requests response object
"""
response = requests.get(url, auth=self.auth, *args, **kwargs)
if verbose:
self.print_request('GET', url, response)
if open_browser:
webbrowser.open(url)
return response
[docs] def server_is_running(self, *args, **kwargs):
"""Return True if the server is running, else False.
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
"""
return True if self.get(self.cromwell_url, *args, **kwargs).status_code == 200 else False
[docs] def abort_workflow(self, workflow_id, *args, **kwargs):
"""Abort a workflow.
:param str workflow_id: hash for workflow to abort
:param bool verbose: if True, print the query, response code, and content (default False)
:param args: additional arguments to pass to requests.post
:param kwargs: additional arguments to pass to requests.post
:return response.Response: requests response object
"""
url = self.url_prefix + '/{id}/abort'.format(id=workflow_id)
return self.post(url, *args, **kwargs)
[docs] def submit(self, files, wait=True, timeout=15, delay=3, verbose=False, *args, **kwargs):
"""Submit a new workflow.
:param dict files: dictionary of files from workflow._submission_json
:param bool wait: if True, wait until workflow recognizes as submitted
:param int timeout: maximum time to wait
:param int delay: time between status queries
:param bool verbose: if True, print request results
:param args: additional positional args to pass to requests.post
:param kwargs: additional keyword args to pass to request.post
:return response.Response: requests response object
"""
submit_response = self.post(self.url_prefix, files=files, *args, **kwargs)
if submit_response.status_code > 201:
self.print_failure(submit_response, 'Workflow failed to start!')
return submit_response
if wait:
self.wait_for_status(
['Running', 'Submitted', 'Succeeded'],
workflow_id=submit_response.json()['id'],
timeout=timeout, delay=delay, verbose=verbose)
return submit_response
def batch(self):
raise NotImplementedError
[docs] def outputs(self, workflow_id, *args, **kwargs):
"""Retrieve outputs for workflow_id.
:param str workflow_id: hash for workflow to abort
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:param args: additional positional args to pass to requests.get
:param kwargs: additional keyword args to pass to request.get
:return response.Response: requests response object
"""
url = self.url_prefix + '/{id}/outputs'.format(id=workflow_id)
return self.get(url, *args, **kwargs)
# todo add formatting to correct datetime string
[docs] def query(self, start=None, end=None, names=None, ids=None, status=None, labels=None, *args,
**kwargs):
"""Query cromwell for workflows matching specified metadata information.
:param str start: datetime string in format #todo
:param str end: datetime string in format #todo
:param list names: list of one or more workflow name(s)
:param list ids: list of one or more workflow id(s)
:param list status: list of one or more workflow status(es). Must be a valid status:
{Submitted, Running, Aborting, Failed, Succeeded, Aborted}
:param dict labels: dictionary of custom label:value pairs
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:return requests.Response:
"""
tags = []
if start and isinstance(start, str):
tags.append('start={}'.format(start))
if end and isinstance(end, str):
tags.append('end={}'.format(end))
if names and isinstance(names, Iterable):
tags.extend(('name={}'.format(n) for n in names))
if ids and isinstance(ids, Iterable):
tags.extend(('id={}'.format(i) for i in ids))
if status and isinstance(status, Iterable):
tags.extend(('status={}'.format(s) for s in status))
if labels and isinstance(labels, dict):
tags.extend(('{k}={v}'.format(k=k, v=v) for k, v in labels.items()))
url = self.url_prefix + '/query?' + '&'.join(tags)
return self.get(url, *args, **kwargs)
# todo implement a way to filter queries by metadata information
def filter(self, **kwargs):
raise NotImplementedError
[docs] def status(self, workflow_id, *args, **kwargs):
"""Retrieve status for workflow_id.
:param str workflow_id: hash for workflow to abort
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:param args: additional positional args to pass to requests.get
:param kwargs: additional keyword args to pass to request.get
:return response.Response: requests response object
"""
url = self.url_prefix + '/{id}/status'.format(id=workflow_id)
return self.get(url, *args, **kwargs)
[docs] def logs(self, workflow_id, *args, **kwargs):
"""Retrieve logs for workflow_id.
:param str workflow_id: hash for workflow to abort
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:param args: additional positional args to pass to requests.get
:param kwargs: additional keyword args to pass to request.get
:return response.Response: requests response object
"""
url = self.url_prefix + '/{id}/logs'.format(id=workflow_id)
return self.get(url, *args, **kwargs)
# todo adjust to use the includeKey and excludeKey parameters; will make things way faster!
[docs] def backends(self, *args, **kwargs):
"""Retrieve backends for this cromwell instance.
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:param args: additional positional args to pass to requests.get
:param kwargs: additional keyword args to pass to request.get
:return response.Response: requests response object
"""
return self.get(self.url_prefix + '/backends', *args, **kwargs)
[docs] def timing(self, workflow_id):
"""Open timing in browser window for workflow_id.
:param str workflow_id: run id to open timing for
"""
webbrowser.open('{prefix}/{id}/timing'.format(prefix=self.url_prefix, id=workflow_id))
[docs] def version(self, *args, **kwargs):
"""Retrieve the cromwell version
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:param args: additional positional args to pass to requests.get
:param kwargs: additional keyword args to pass to request.get
:return response.Response: requests response object
"""
url = '{cromwell_url}/engine/{version}/version'.format(
cromwell_url=self.cromwell_url, version=self.api_version)
return self.get(url, *args, **kwargs)
[docs] def stats(self, *args, **kwargs):
"""Retrieve cromwell statistics on number of running jobs
:param bool verbose: if True, print the query, response code, and content (default False)
:param bool open_browser: if True, display the GET result in browser (default False)
:param args: additional positional args to pass to requests.get
:param kwargs: additional keyword args to pass to request.get
:return response.Response: requests response object
"""
url = '{cromwell_url}/engine/{version}/stats'.format(
cromwell_url=self.cromwell_url, version=self.api_version)
return self.get(url, *args, **kwargs)