Source code for cromwell_manager.io_util
import os
import sys
from io import BytesIO, BufferedIOBase
from tempfile import NamedTemporaryFile
import zipfile
import datetime
from google.cloud import storage
import webbrowser
import requests
[docs]class GSObject:
def __init__(self, gs_filestring, client=None):
"""Object for downloading google storage blobs.
:param str gs_filestring: google storage url for file to be downloaded
:param google.cloud.storage.Client | None client: (optional) authenticated google storage
client
"""
# get client
if isinstance(client, storage.Client):
self.client = client
elif client is None:
self.client = storage.Client()
else:
raise TypeError('client must be a google.cloud.storage.Client object or None, not %s'
% type(client))
# get bucket, blob from filestring
if isinstance(gs_filestring, str) and gs_filestring.startswith('gs://'):
bucket, blob = self.split_path(gs_filestring)
self.bucket = self.client.bucket(bucket)
self.blob = self.bucket.get_blob(blob)
else:
raise TypeError('gs_filestring must be a string that startswith "gs://"')
@staticmethod
[docs] def split_path(path):
"""Utility to split a google storage path into bucket + key.
:param str path: google storage path (must have gs:// prefix)
:return str: bucket
:return str: blob
"""
if not path.startswith('gs://'):
raise ValueError('%s path is not a valid code review')
prefix, _, bucket, *blob = path.split('/')
return bucket, '/'.join(blob)
[docs] def download_as_string(self):
"""Download data as a string
:return str: downloaded blob data
"""
return self.blob.download_as_string().decode()
[docs] def download_to_file(self, file_object):
"""Download data to file
:param io.BufferedIOBase file_object: open bytes-writable file object
"""
if not isinstance(file_object, BufferedIOBase):
raise TypeError('file_object must be an open, writable file object')
self.blob.download_to_file(file_object)
[docs] def download_to_bytes_readable(self):
"""Return a bytes file-like object readable by requests and REST APIs
:return BufferedIOBase: readable file object
"""
string_buffer = BytesIO()
self.blob.download_to_file(string_buffer)
string_buffer.seek(0)
return string_buffer
[docs]class HTTPObject:
def __init__(self, url):
"""Object for downloading files at http or https endpoints.
e.g. github raw endpoints
:param str url: url of data to be downloaded to file
"""
if isinstance(url, str) and (url.startswith('http://') or url.startswith('https://')):
self.url = url
else:
raise TypeError('url must be a str that starts with http:// or https://')
[docs] def download_as_string(self):
"""Download data as a string
:return str: downloaded url data
"""
return requests.get(self.url).content.decode()
[docs] def download_to_file(self, file_object):
"""Download data to file
:param io.BufferedIOBase file_object: open bytes-writable file object
"""
bytestring = requests.get(self.url).content
file_object.write(bytestring)
[docs] def download_to_bytes_readable(self):
"""Return a bytes file-like object readable by requests and REST APIs
:return BufferedIOBase: readable file object
"""
bytestring = requests.get(self.url).content
buffer = BytesIO(bytestring)
buffer.seek(0)
return buffer
def exists(self):
return True if requests.head(self.url).status_code == 200 else False
[docs]def package_workflow_dependencies(**dependencies):
"""Download wdls, zip, and return a bytes-readable output
:param dependencies: dict of dependency (name, path) pairs to be included in the archive
- name should be the expected name for the imported dependency
- path should give the object's location, supports google storage, https, and local paths
:return File: file object with binary data written.
"""
archive_buffer = NamedTemporaryFile(delete=False)
archive = zipfile.ZipFile(archive_buffer, 'a')
for name, dependency in dependencies.items():
if dependency.startswith('gs://'):
dependency_data = GSObject(dependency).download_as_string()
archive.writestr(name, dependency_data)
if dependency.startswith('https://') or dependency.startswith('http://'):
dependency_data = HTTPObject(dependency).download_as_string()
archive.writestr(name, dependency_data)
else: # assume filepath
archive.write(dependency, arcname=name)
archive.close() # writes essential records
archive_buffer.close() # clean up, will be deleted on program termination
return open(archive_buffer.name, 'rb')
[docs]def open_gs_console(link, project):
"""open the google storage console to view the contents of link
:param str link: gs file or directory
:param str project: project owner of link
"""
if link.startswith('gs://'):
link = link.replace('gs://', '')
link = 'https://storage.cloud.google.com/{link}'.format(link=link)
if link.endswith('/'):
link += '?project={project}'.format(project=project)
link = 'https://console.cloud.google.com/storage/browser/{link}'.format(link=link)
webbrowser.open(link)
def check_exists(file_or_link):
"""check that a file or link points to a valid location
:param str file_or_link:
:return bool:
"""
if file_or_link.startswith('http'):
rc = requests.head(file_or_link).status_code
if rc == 200:
announce('checking {}... OK.'.format(file_or_link))
else:
announce('checking {}... returned code {!s}, FAIL.'.format(file_or_link, rc))
elif file_or_link.startswith('gs://'):
if GSObject(file_or_link).blob.exists():
announce('checking {}... OK.'.format(file_or_link))
else:
announce('checking {}... does not exist!, FAIL.'.format(file_or_link))
else:
if os.path.isfile(file_or_link):
announce('checking {}... OK.')
else:
announce('checking {}... not a valid file, FAIL.'.format(file_or_link))
def announce(message):
print('CWM:{}:{}'.format(datetime.datetime.now(), message))