Source code for eptransition.manager

import logging
import os
import shutil

from eptransition.exceptions import (
    FileAccessException as eFAE, FileTypeException as eFTE, ManagerProcessingException, ProcessingException
)
from pyiddidf.idd.processor import IDDProcessor
from pyiddidf.idf.objects import IDFStructure, ValidationIssue
from pyiddidf.idf.processor import IDFProcessor
from eptransition.rules.version_rule import VersionRule
from eptransition.versions.versions import TRANSITIONS, TypeEnum

module_logger = logging.getLogger("eptransition.manager")


[docs]class TransitionManager(object): """ This class is the main manager for performing transition of an input file to the latest version. Developer note: This class raises many exceptions, so logging.exception is handled at the level of the code calling these functions within a try/except block. These functions do logging, but only the info/debug level. :param str original_input_file: Full path to the original idf to transition """ def __init__(self, original_input_file): self.original_input_file = original_input_file module_logger.debug("Transitioning file: {}".format(original_input_file)) self.original_base_file_name = os.path.splitext(os.path.basename(original_input_file))[0] self.output_directory = os.path.join(os.path.dirname(original_input_file), self.original_base_file_name) module_logger.debug("Created the new output directory as {}".format(self.output_directory)) if os.path.exists(self.output_directory): module_logger.debug("output_directory already exists, I'll leave it alone and hope for the best!") else: try: os.mkdir(self.output_directory) except OSError: module_logger.debug("Could not make output directory, permission issue maybe?") raise # instantiate to None for now self.original_idd_file = None self.new_idd_file = None # also we should check for mvi/rvi files self.original_rvi_file = None potential_rvi_path = os.path.join(os.path.dirname(original_input_file), self.original_base_file_name + ".rvi") if os.path.exists(potential_rvi_path): self.original_rvi_file = potential_rvi_path self.original_mvi_file = None potential_mvi_path = os.path.join(os.path.dirname(original_input_file), self.original_base_file_name + ".mvi") if os.path.exists(potential_mvi_path): self.original_mvi_file = potential_mvi_path
[docs] def rvi_mvi_replace(self, original_file_path, new_file_path, output_rule): # pragma no cover - open up later # first clean up the output variable list, make each key upper case and strip any randomness off of each value upper_case_dictionary = dict() for k, v in output_rule.get_simple_swaps().iteritems(): upper_case_dictionary[k.upper()] = v.strip() # then read the original file line by line, checking for a stripped-upper-case key with open(new_file_path, 'w') as f_out: with open(original_file_path) as f: for line in f: if line.strip().upper() in upper_case_dictionary: f_out.write(upper_case_dictionary[line.strip().upper()] + '\n') else: f_out.write(line)
[docs] def perform_transition(self): """ This function manages the transition from one version to another by opening, validating, and writing files :return: Final transitioned idf structure; raises exception for failures :raises FileAccessException: if a specified file does not access :raises FileTypeException: if a specified file type does not match the expected condition :raises ManagerProcessingException: if there is a problem processing the contents of the files """ # Validate input file related things if not os.path.exists(self.original_input_file): raise eFAE(self.original_input_file, eFAE.CANNOT_FIND_FILE, eFAE.ORIGINAL_INPUT_FILE) if self.original_input_file.endswith(".idf"): original_idf_file_type = TypeEnum.IDF elif self.original_input_file.endswith(".jdf"): # pragma no cover original_idf_file_type = TypeEnum.JSON else: raise eFTE(self.original_input_file, eFTE.ORIGINAL_INPUT_FILE, "Unexpected extension, should be .idf or .jdf") # At this point we now need to know the version of the idf, before we even try to read the idd really original_idf_processor = IDFProcessor() try: # this call _will_ process the version into IDFObject.version_float or raise an exception if it fails idf_to_transition = original_idf_processor.process_file_given_file_path(self.original_input_file) module_logger.debug( "Successfully processed idf structure; found {} objects".format(len(idf_to_transition.objects))) except Exception: raise ManagerProcessingException("Could not process original idf; aborting") # initialize the return structures original_idf_structure = idf_to_transition final_idf_structure = None # so we get the original idf version now original_idf_version = idf_to_transition.version_float module_logger.debug("Original IDF version found as {}".format(original_idf_version)) # then we know which VERSION item we"re working on: if original_idf_version in TRANSITIONS: # store the initial one first_transition = TRANSITIONS[original_idf_version] module_logger.debug("First transition to be attempted is from version {} to {}".format( first_transition.start_version, first_transition.end_version)) # start the array of transitions these_transitions = [first_transition] # start the variable before we loop current_transition_end_version = first_transition.end_version while True: if current_transition_end_version in TRANSITIONS: # pragma no cover # this block simply won't happen until we add a second transition in later current_transition = TRANSITIONS[current_transition_end_version] these_transitions.append(current_transition) current_transition_end_version = current_transition.end_version module_logger.debug("Follow-up transition found from version {} to {}".format( current_transition.start_version, current_transition.end_version )) continue else: break else: raise ManagerProcessingException( "IDF Version ({}) not found in available transitions".format(original_idf_version)) # first copy this original files into the output directory renamed with the version ID for ease of diffing, etc. target_original_file = os.path.join( self.output_directory, "{}_{}.idf".format(self.original_base_file_name, original_idf_version)) try: shutil.copy(self.original_input_file, target_original_file) except Exception: # pragma no cover module_logger.debug("Could not copy original input file from {} to {}".format( self.original_input_file, target_original_file)) raise # do the same for the rvi if self.original_rvi_file: target_original_rvi = os.path.join( self.output_directory, "{}_{}.rvi".format(self.original_base_file_name, original_idf_version)) try: shutil.copy(self.original_rvi_file, target_original_rvi) except Exception: # pragma no cover module_logger.debug("Could not copy original rvi file from {} to {}".format( self.original_input_file, target_original_rvi)) raise # and do the same for the mvi if self.original_mvi_file: target_original_mvi = os.path.join( self.output_directory, "{}_{}.mvi".format(self.original_base_file_name, original_idf_version)) try: shutil.copy(self.original_mvi_file, target_original_mvi) except Exception: # pragma no cover module_logger.debug("Could not copy original mvi file from {} to {}".format( self.original_input_file, target_original_mvi)) raise for i, this_transition in enumerate(these_transitions): end_version = this_transition.end_version this_version_idf_file_path = os.path.join( self.output_directory, "{}_{}.idf".format(self.original_base_file_name, end_version)) module_logger.debug("Found this version in transitions, will try to transition from {} to {}".format( this_transition.start_version, this_transition.end_version )) this_version_mvi_file_path = None if self.original_mvi_file: prior_version_mvi_file_path = os.path.join( self.output_directory, "{}_{}.mvi".format(self.original_base_file_name, this_transition.start_version)) this_version_mvi_file_path = os.path.join( self.output_directory, "{}_{}.mvi".format(self.original_base_file_name, end_version)) module_logger.debug("Found MVI version in transitions, will try to transition from {} to {}".format( this_transition.start_version, this_transition.end_version )) this_version_rvi_file_path = None if self.original_rvi_file: prior_version_rvi_file_path = os.path.join( self.output_directory, "{}_{}.rvi".format(self.original_base_file_name, this_transition.start_version)) this_version_rvi_file_path = os.path.join( self.output_directory, "{}_{}.rvi".format(self.original_base_file_name, end_version)) module_logger.debug("Found RVI version in transitions, will try to transition from {} to {}".format( this_transition.start_version, this_transition.end_version )) # if the IDD files are "None", then try to match them up idd_file = "Energy+.idd" cur_dir = os.path.dirname(os.path.realpath(__file__)) if self.original_idd_file is None: self.original_idd_file = os.path.join(cur_dir, "versions", str(this_transition.start_version), idd_file) module_logger.debug("Using \"original\" idd file at path: {}".format(self.original_idd_file)) if self.new_idd_file is None: self.new_idd_file = os.path.join(cur_dir, "versions", str(this_transition.end_version), idd_file) module_logger.debug("Using \"new\" idd file at path: {}".format(self.new_idd_file)) # Validate dictionary file things if not os.path.exists(self.original_idd_file): # pragma no cover raise eFAE(self.original_idd_file, eFAE.CANNOT_FIND_FILE, eFAE.ORIGINAL_DICT_FILE) if not os.path.exists(self.new_idd_file): # pragma no cover raise eFAE(self.new_idd_file, eFAE.CANNOT_FIND_FILE, eFAE.UPDATED_DICT_FILE) if self.original_idd_file.endswith(".idd"): original_idd_file_type = TypeEnum.IDF elif self.original_idd_file.endswith(".jdd"): # pragma no cover original_idd_file_type = TypeEnum.JSON else: # pragma no cover raise eFTE(self.original_idd_file, eFTE.ORIGINAL_DICT_FILE, "Unexpected extension, should be .idd or .jdd") # now validate the file types match each other if original_idf_file_type == original_idd_file_type: pass # that's a good thing else: # pragma no cover raise ManagerProcessingException("Original file types don't match; input file={}; dictionary={}".format( original_idf_file_type, original_idd_file_type)) # and process the original idd file original_idd_processor = IDDProcessor() try: original_idd_structure = original_idd_processor.process_file_given_file_path(self.original_idd_file) module_logger.debug("Successfully processed original idd") except ProcessingException as e: # pragma no cover raise ManagerProcessingException("Could not process original idd; message = " + str(e)) # and process the new idd file new_idd_processor = IDDProcessor() try: new_idd_structure = new_idd_processor.process_file_given_file_path(self.new_idd_file) module_logger.debug("Successfully processed \"new\" idd") except: # pragma no cover raise ManagerProcessingException("Could not process new idd; aborting") # validate the current idf before continuing issues = idf_to_transition.validate(original_idd_structure) if len(issues) > 0: # pragma no cover, we haven't really got these organized yet for i in issues: if i.severity == ValidationIssue.INFORMATION: module_logger.debug(str(i)) elif i.severity == ValidationIssue.WARNING: module_logger.warn(str(i)) elif i.severity == ValidationIssue.ERROR: raise ManagerProcessingException( "Errors found in validating of original idf against original idd; aborting", issues) class LocalRuleInformation: def __init__(self, local_rule): self.dependent_names = local_rule.get_names_of_dependent_objects() self.transition_function = local_rule.transition # now read in the rules and create a map based on the upper case version of the IDF object to transition this_version_rule = VersionRule(end_version) rules = [this_version_rule] rules.extend(this_transition.transitions) rule_map = {} for rule in rules: rule_map[rule.get_name_of_object_to_transition().upper()] = LocalRuleInformation(rule) if this_transition.output_variable_transition is None: # pragma no cover - shouldn't really have this now output_rule = None output_names = [] module_logger.warn("This transition did not find an output variable transition class...you sure?") else: output_rule = this_transition.output_variable_transition output_names = output_rule.get_output_objects() # here we can do the rvi mvi file stuff if this_version_rvi_file_path: self.rvi_mvi_replace(prior_version_rvi_file_path, this_version_rvi_file_path, output_rule) if this_version_mvi_file_path: self.rvi_mvi_replace(prior_version_mvi_file_path, this_version_mvi_file_path, output_rule) # create a list of objects to be deleted (which is a list of Type/Name, or more accurately Type/Field0 objects_to_delete = [] # create an intermediate list of idf objects to tentatively be written to the idf intermediate_idf_objects = [] # create a final list of idf objects to actually be written to the idf final_idf_objects = [] # if there are any global swaps to make, make them now if this_transition.global_swap: idf_to_transition.global_swap(this_transition.global_swap) # loop over all objects in the original input file for original_idf_object in idf_to_transition.objects: idf_object_type_upper = original_idf_object.object_name.upper() # if the upper case version of this idf object is in the rule map, process the rule, otherwise just # keep the object in the intermediate list of objects to write if idf_object_type_upper in rule_map: module_logger.debug("Transition object type: {}".format(idf_object_type_upper)) # retrieve the rule for this idf object this_rule = rule_map[idf_object_type_upper] # create a map of dependents; where the key is the upper case object type and the value is # the list of all the objects found in the original idf dependents = {} for dep_idf_type in this_rule.dependent_names: # pragma no cover -- add back when testing this dependents[dep_idf_type.upper()] = idf_to_transition.get_idf_objects_by_type(dep_idf_type) # call transition to actually perform object changes transition_response = this_rule.transition_function(original_idf_object, dependents) module_logger.debug( "Object transition complete; # objects to write: {}; # objects to delete: {}".format( len(transition_response.to_write), len(transition_response.to_delete) )) # extend the intermediate list objects with changed things and the delete list with things to delete intermediate_idf_objects.extend(transition_response.to_write) objects_to_delete.extend(transition_response.to_delete) elif idf_object_type_upper in output_names: # create a map of dependents; where the key is the upper case object type and the value is # the list of all the objects found in the original idf dependents = {} for dep_idf_type in output_rule.get_dependent_object_names(): dependents[dep_idf_type.upper()] = idf_to_transition.get_idf_objects_by_type(dep_idf_type) transition_response = output_rule.transition(original_idf_object, dependents) intermediate_idf_objects.extend(transition_response.to_write) else: # if there was no rule, just keep the object as it is intermediate_idf_objects.append(original_idf_object) # create a map of objects to delete based on type module_logger.debug( "First pass transition complete; # objects to delete: {}".format(len(objects_to_delete))) delete_map = {} for object_to_delete in objects_to_delete: # pragma no cover -- add back when a real rule does this object_type_upper = object_to_delete.type.upper() object_name_upper = object_to_delete.name.upper() if object_type_upper in delete_map: delete_map[object_type_upper].append(object_name_upper) else: delete_map[object_type_upper] = [object_name_upper] # and now we create final_idf_structure = IDFStructure(this_version_idf_file_path) final_idf_structure.version_float = this_version_rule.end_version_id final_idf_structure.version_string = str(final_idf_structure.version_float) module_logger.debug( "Created \"final\" idf structure for this transition step, assigned version = {}".format( this_version_rule.end_version_id)) # loop over all for intermediate_idf_object in intermediate_idf_objects: delete = False # take out the pragma once we have a transition rule that actually does deletions inter_obj_name_upper = intermediate_idf_object.object_name.upper() if inter_obj_name_upper in delete_map: # pragma no cover if intermediate_idf_object.fields[0].upper() in delete_map[inter_obj_name_upper]: delete = True if not delete: final_idf_objects.append(intermediate_idf_object) final_idf_structure.objects = final_idf_objects # report and done module_logger.debug("Transition complete; final # objects: {}".format(len(final_idf_structure.objects))) # if we are going to cycle, we"ll want the idf_to_transition variable to be filled # either way we"ll write this intermediate file idf_to_transition = final_idf_structure final_idf_structure.write_idf(this_version_idf_file_path, new_idd_structure) if i == len(these_transitions) - 1: module_logger.debug("Completed all transitions, writing file and leaving") else: # pragma no cover -- add back once we have multiple transitions module_logger.debug("Going to start a new transition on this file, storing structure and continuing") return original_idf_structure, final_idf_structure