Source code for eptransition.idf.processor

import StringIO
import logging
import os

from eptransition import exceptions
from eptransition.idf.objects import IDFObject, IDFStructure

module_logger = logging.getLogger("eptransition.idd.processor")


[docs]class IDFProcessor: """ The core IDF Processor class. Given an IDF via stream or path, this class has workers to robustly process the IDF into a rich IDFStructure instance. The constructor takes no arguments but sets up instance variables. Relevant "public" members are listed here: :ivar IDFStructure idf: The resulting IDFStructure instance after processing the IDF file/stream :ivar str file_path: A file path for this IDF, although it may be just a simple descriptor """ def __init__(self): self.idf = None self.file_path = None self.input_file_stream = None
[docs] def process_file_given_file_path(self, file_path): """ This worker allows processing of an IDF file at a specific path on disk. :param file_path: The path to an IDF file on disk. :return: An IDFStructure instance created from processing the IDF file :raises ProcessingException: if the specified file does not exist """ if not os.path.exists(file_path): raise exceptions.ProcessingException("Input file not found=\"" + file_path + "\"") self.input_file_stream = open(file_path, "r") self.file_path = file_path return self.process_file()
[docs] def process_file_via_stream(self, idf_file_stream): """ This worker allows processing of an IDF snippet via stream. Most useful for unit testing, but possibly for other situations. :param file-like-object idf_file_stream: An IDF snippet that responds to typical file-like commands such as read(). A common object would be the StringIO object. :return: An IDFStructure instance created from processing the IDF snippet """ self.input_file_stream = idf_file_stream self.file_path = "/streamed/idf" return self.process_file()
[docs] def process_file_via_string(self, idf_string): """ This worker allows processing of an IDF snippet string. Most useful for unit testing, but possibly for other situations. :param str idf_string: An IDF snippet string :return: An IDFStructure instance created from processing the IDF string """ self.input_file_stream = StringIO.StringIO(idf_string) self.file_path = "/string/idf/snippet" return self.process_file()
[docs] def process_file(self): """ Internal worker function that reads the IDF stream, whether it was constructed from a file path, stream or string. This processor then processes the file line by line looking for IDF objects and comment blocks, and parsing them into a meaningful structure :return: An IDF structure describing the IDF contents :raises ProcessingException: for any issues encountered during the processing of the idf """ self.idf = IDFStructure(self.file_path) # phase 0: read in lines of file lines = self.input_file_stream.readlines() class Blob: COMMENT = 1 OBJECT = 2 def __init__(self, blob_type, blob_lines=None): self.blob_type = blob_type if blob_lines is None: blob_lines = [] self.lines = blob_lines # should there maybe be 3 "state as of last line" options? # 1. reading comment block # 2. reading IDF block # so let's try keeping the idf in blobs of either comment data or object data current_blob = None initial_blobs = [] for line in lines: line_text = line.strip() if len(line_text) == 0: continue elif line_text.startswith("!"): if current_blob is None: current_blob = Blob(Blob.COMMENT) current_blob.lines.append(line_text) elif current_blob.blob_type == Blob.COMMENT: # just add to the lines and carry on current_blob.lines.append(line_text) elif current_blob.blob_type == Blob.OBJECT: # ignore it, we are still trying to read the object.. continue else: if current_blob is None: # then this blob is fresh and is the start of a new object, but it could also be the end (one-liner) current_blob = Blob(Blob.OBJECT) actual_line = line_text if "!" in line_text >= 0: actual_line = line_text[:line_text.find("!")] if ";" in actual_line: # we end this object blob current_blob.lines.append(line_text) initial_blobs.append(current_blob) current_blob = None else: current_blob.lines.append(line_text) elif current_blob.blob_type == Blob.OBJECT: # then we should append this line to the current blob, but we also need to check if it is the end current_blob.lines.append(line_text) actual_line = line_text if "!" in line_text >= 0: actual_line = line_text[:line_text.find("!")] if ";" in actual_line: # we end this object blob initial_blobs.append(current_blob) current_blob = None elif current_blob.blob_type == Blob.COMMENT: # then we need to package up the previous comment blob, and create this new one, but again..1-liner? initial_blobs.append(current_blob) current_blob = Blob(Blob.OBJECT) current_blob.lines.append(line_text) actual_line = line_text if "!" in line_text >= 0: actual_line = line_text[:line_text.find("!")] if ";" in actual_line: # we end this object blob initial_blobs.append(current_blob) current_blob = None if current_blob is not None: initial_blobs.append(current_blob) # next let's go blob by blob and clean up any trailing comments and such idf_objects = [] for initial_blob in initial_blobs: if initial_blob.blob_type == Blob.COMMENT: idf_objects.append(IDFObject(initial_blob.lines, True)) else: out_lines = [] for line in initial_blob.lines: line_text = line.strip() this_line = "" if len(line_text) > 0: exclamation = line_text.find("!") if exclamation == -1: this_line = line_text elif exclamation > 0: this_line = line_text[:exclamation] if not this_line == "": out_lines.append(this_line.strip()) # check these object lines for malformed idf syntax for l in out_lines: if not (l.endswith(",") or l.endswith(";")): raise exceptions.ProcessingException( "IDF line doesn't end with comma/semicolon\nline:\"" + l + "\"") # intermediate: join entire array and re-split by semicolon idf_data_joined = "".join(out_lines) idf_object_strings = idf_data_joined.split(";") # phase 3: inspect each object and its fields for obj in idf_object_strings: tokens = obj.split(",") nice_object = [t.strip() for t in tokens] if len(nice_object) == 1: if nice_object[0] == "": continue idf_objects.append(IDFObject(nice_object)) self.idf.objects = idf_objects try: self.idf.version_string = self.idf.get_idf_objects_by_type("Version")[0].fields[0] except IndexError: raise exceptions.ProcessingException("Could not get version object in idf!") try: version_tokens = self.idf.version_string.split(".") tmp_string = "{}.{}".format(version_tokens[0], version_tokens[1]) self.idf.version_float = float(tmp_string) except ValueError: raise exceptions.ProcessingException( "Found IDF version, but could not coerce into floating point representation") return self.idf