Source code for veoibd_synapse.cli.push

#!/usr/bin/env python
"""Provide code devoted to uploading data to Synapse."""

# Imports
from logzero import logger as log

from pathlib import Path
import datetime as dt
import glob
from collections import deque

import networkx as nx
import synapseclient as synapse

from click.utils import echo

from munch import Munch, munchify

import veoibd_synapse.errors as e
from veoibd_synapse.misc import process_config, chunk_md5
import veoibd_synapse.dag_tools as dtools


# Metadata
__author__ = "Gus Dunn"
__email__ = "w.gus.dunn@gmail.com"



# Functions
[docs]class Push(object): """Manage interactions with Synapse concerning adding/changing information on the Synapse servers."""
[docs] def __init__(self, main_confs, user, push_config,): """Initialize and validate basic information for a Push.""" log.debug("Initializing Push obj.") self.main_confs = main_confs self.user = self._process_user(user=user, users=self.main_confs.USERS) self.push_id = None self.push_time = None self.push_config_path = push_config self.push_config = self._process_push_config(push_config=push_config) log.info("Initializing Synapse client.") self.syn = synapse.Synapse() self.dag = None log.info("Creating interaction instances.") self._create_interactions()
[docs] def _process_user(self, user, users): """Validate the value for 'user'.""" if user is None: raise e.ValidationError('A value for USER must be provided.') try: return users[user] except KeyError: raise e.ValidationError('User "{user}" not found in "users.yaml".'.format(user=user))
[docs] def _process_push_config(self, push_config): """Validate the value for 'push_config' and set 'push_id' and 'push_time'.""" if push_config is None: raise e.ValidationError('A value for PUSH_CONFIG must be provided.') pconf = process_config(config=push_config) try: itype = pconf.INTERACTION_TYPE.lower() if itype == 'push': self.push_id = chunk_md5(push_config, size=1024000) self.push_time = dt.datetime.now(tz=dt.timezone.utc).isoformat() return pconf else: msg = """The "INTERACTION_TYPE" of a push-config file must be "push", not "{itype}".""".format(itype=itype) raise e.ValidationError(msg) except AttributeError: msg = """The push-config file must have "INTERACTION_TYPE" set in the top level.""" raise e.ValidationError(msg)
[docs] def login(self): """Log in to Synapse and acquire the project entity.""" log.info("Initiating log in to Synapse and acquiring the project entity.") self.syn.login(email=self.user.SYN_USERNAME, apiKey=self.user.API_KEY) project_name = self.push_config.PROJECT_NAME log.info("""Acquiring Synapse project instance for "{name}".""".format(name=project_name)) try: self.project = self.syn.get(synapse.Project(name=project_name)) except TypeError: self.project = self.syn.store(synapse.Project(name=project_name)) self._build_remote_entity_dag()
[docs] def execute(self): """Execute the configured interactions.""" log.info("Executing configured push interations.") for interaction in self.interactions: interaction.execute()
[docs] def _create_interactions(self): """Create and store interaction objects based on the push_config.""" self.interactions = [] manual_updates = set(["ANNOTATIONS"]) for interaction in self.push_config.INTERACTIONS: info = self.__base_info() # Manual updates info.ANNOTATIONS.update(interaction.ANNOTATIONS) # Standard updates keys = set(interaction.keys()) - manual_updates for key in keys: info[key] = interaction[key] # create and store the PushInteraction self.interactions.append(PushInteraction(info=info, push_obj=self)) # create special interaction to push the config file to the project record_info = Munch() record_info.REMOTE_DESTINATION_DIR = "push_history" record_info.CREATE_DIR = True record_info.ANNOTATIONS = Munch() record_info.ANNOTATIONS.file_type = 'yaml' record_info.ANNOTATIONS.push_history = True record_info.LOCAL_PATHS = [self.push_config_path] self.interactions.append(PushInteraction(info=record_info, push_obj=self))
def __base_info(self): """Return a fresh basic info tree for a new interaction to update.""" base_info = Munch({'ANNOTATIONS': Munch()}) base_info.ANNOTATIONS.deposited_by = self.user.SYN_USERNAME base_info.ANNOTATIONS.consortium_site = self.main_confs.SITE.SITE_NAME try: base_info.ANNOTATIONS.update(self.push_config.COMMON_ANNOTATIONS) except AttributeError: pass base_info.PROJECT_ID = self.push_config.PROJECT_ID return base_info
[docs] def _get_remote_entity_dicts(self): """Query Synapse for all entity information related to this Project ID.""" pid = self.project['id'][3:] q = 'SELECT * FROM entity WHERE projectId=="{pid}"'.format(pid=pid) ent_dicts = {ent['entity.id']: ent for ent in self.syn.query(q)['results']} # get rid of the 'entity.' prefixes ent_dicts_ = {} for name, d in ent_dicts.items(): d_ = {key.replace('entity.', ''): val for key, val in d.items()} ent_dicts_[name] = d_ self.entity_dicts = ent_dicts_
[docs] def _build_remote_entity_dag(self): """Build a DAG of the remote project structure.""" log.info("Building the project's DAG.") self._get_remote_entity_dicts() dag = dtools.ProjectDAG(project_id=self.project['id'], synapse_session=self.syn) # add nodes and edges with nodes being simple id names for now for n_id,node in self.entity_dicts.items(): dag.add_edge(u=node['parentId'], v=n_id) # remove Project's parent from the dag bc it is useless to us. # we want our project as root. dag.remove_node(self.project['parentId']) # SynNode objects as values of each node for name in dag.nodes(): dag.node[name] = dtools.SynNode(entity_dict=self.entity_dicts[name], synapse_session=self.syn) # label our project as root dag.node[self.project['id']].is_root = True if nx.dag.is_directed_acyclic_graph(dag): self.dag = dag else: raise e.ValidationError('networkx.dag.is_directed_acyclic_graph() returned `False` suggesting a cyclic relationship between entities.')
[docs]class BaseInteraction(object): """Base class to manage information and execution for a single interaction with Synapse."""
[docs] def __init__(self, info): """Initialize an Interaction. More docs... """ self.info = info
[docs]class PushInteraction(BaseInteraction): """Manage information and execution for a single "push" interaction with Synapse."""
[docs] def __init__(self, info, push_obj): """Initialize a PushInteraction. Args: info (dict-like): Basic information tree. push_obj (Push): Pointer the host ``Push`` object. """ # self.info = None super().__init__(info) self.push = self._process_push_obj(push_obj) self.syn = self.push.syn self.info.LOCAL_PATHS = self._process_local_paths()
[docs] def prepare_destination(self): """Get or create remote destination.""" log.info("""Preparing destination "{path}".""".format(path=self.info.REMOTE_DESTINATION_DIR)) # does our destination exist? # If not, create Synapse Objects for them and add to the DAG if appropriate. path = deque(self.info.REMOTE_DESTINATION_DIR.split('/')) destination = self.push.dag.follow_path_to_folder(path=path, origin=None, create=self.info.CREATE_DIR) self.destination = destination
[docs] def add_file(self, loc_file): """Create and add Synapse File object to DAG and upload to Synapse.""" log.info("""File: "{name}".""".format(name=loc_file.name)) # Create and add file to Synapse parent_obj = self.push.dag.node[self.destination] annotations = self.info.ANNOTATIONS new_file = synapse.File(path=str(loc_file), parent=parent_obj, annotations=annotations) new_file = self.syn.store(new_file) new_file_id = new_file['id'] # add file to DAG self.push.dag.add_edge(u=self.destination, v=new_file_id, attr_dict=None) entity_dict = {k:v for k,v in new_file.items()} self.push.dag.node[new_file_id] = dtools.SynNode(entity_dict=entity_dict, synapse_session=self.syn, is_root=False)
[docs] def execute(self): """Execute the push interaction.""" self.prepare_destination() for loc_file in self.info.LOCAL_PATHS: self.add_file(loc_file=loc_file)
[docs] def _process_push_obj(self, push_obj): """Make sure we have what we think we have.""" if not isinstance(push_obj, Push): msg = """Expected object of type: "Push", but got one of type: {got}.""".format(got=push_obj.__class__) raise e.ValidationError(msg) return push_obj
[docs] def _process_local_paths(self): """Make sure that all local paths exist and convert them to Path()s.""" local_paths = [] for p in self.info.LOCAL_PATHS: globbed = glob.glob(p) if len(globbed) == 0: msg = """LOCAL_PATH value: {p} did not resolve to any files.""".format(p=p) raise e.ValidationError(msg) for gp in globbed: local_paths.append(Path(gp)) return local_paths
[docs]def main(ctx, user, push_config): """Consume a push-config file, execute described transactions, save record of transactions.""" main_confs = ctx.obj.CONFIG push = Push(main_confs=main_confs, user=user, push_config=push_config) push.login() push.execute()