Source code for eptransition.idd.processor

import StringIO
import logging
import os

from eptransition import exceptions
from eptransition.idd.objects import IDDField, IDDObject, IDDStructure, IDDGroup

module_logger = logging.getLogger("eptransition.idd.processor")


[docs]class CurrentReadType: """ Internal class containing constants for the different states of the actual IDD Processor engine """ EncounteredComment_ReadToCR = 0 ReadAnything = 1 ReadingGroupDeclaration = 2 ReadingObjectName = 3 LookingForObjectMetaDataOrNextField = 4 ReadingObjectMetaData = 5 ReadingObjectMetaDataContents = 6 ReadingFieldANValue = 7 ReadingFieldMetaData = 8 ReadingFieldMetaDataOrNextANValue = 9 LookingForFieldMetaDataOrNextObject = 10 LookingForFieldMetaDataOrNextField = 11
# keep a global dictionary of read IDD structures, could eventually move into the class, but right now we instantiate # the class over and over so that wouldn't work IDD_CACHE = {}
[docs]class IDDProcessor: """ The core IDD Processor class. Given an IDD via stream or path, this class has workers to robustly process the IDD into a rich IDDStructure instance. The constructor takes no arguments but sets up instance variables. Relevant "public" members are listed here: :ivar IDDStructure idd: The resulting IDDStructure instance after processing the IDD file/stream :ivar str file_path: A file path for this IDD, although it may be just a simple descriptor """ def __init__(self): self.idd = None self.idd_file_stream = None self.file_path = None self.group_flag_string = "\\group" self.obj_flags = ["\\memo", "\\unique-object", "\\required-object", "\\min-fields", "\\obselete", "\\extensible", "\\format"] self.field_flags = ["\\field", "\\note", "\\required-field", "\\begin-extensible", "\\unitsBasedOnField", "\\units", "\\ip-units", "\\scheduleunits", "\\minimum", "\\maximum", "\\default", "\\deprecated", "\\autosizable", "\\autocalculatable", "\\type", "\\retaincase", "\\key", "\\object-list", "\\reference", "\\external-list"]
[docs] def process_file_given_file_path(self, file_path): """ This worker allows processing of an IDD file at a specific path on disk. :param file_path: The path to an IDD file on disk. :return: An IDDStructure instance created from processing the IDD file :raises ProcessingException: if the specified file does not exist """ if not os.path.exists(file_path): raise exceptions.ProcessingException("Input IDD file not found=\"" + file_path + "\"") # pragma: no cover self.idd_file_stream = open(file_path, "r") self.file_path = file_path return self.process_file()
[docs] def process_file_via_stream(self, idd_file_stream): """ This worker allows processing of an IDD snippet via stream. Most useful for unit testing, but possibly for other situations. :param file-like-object idd_file_stream: An IDD snippet that responds to typical file-like commands such as read(). A common object would be the StringIO object. :return: An IDDStructure instance created from processing the IDD snippet """ self.idd_file_stream = idd_file_stream self.file_path = "/streamed/idd" return self.process_file()
[docs] def process_file_via_string(self, idd_string): """ This worker allows processing of an IDD snippet string. Most useful for unit testing, but possibly for other situations. :param str idd_string: An IDD snippet string :return: An IDDStructure instance created from processing the IDD string """ self.idd_file_stream = StringIO.StringIO(idd_string) self.file_path = "/string/idd/snippet" return self.process_file()
[docs] def peek_one_char(self): """ Internal worker function that reads a single character from the internal IDD stream but resets the stream to the former position :return: A single character, the one immediately following the cursor, or None if it can't peek ahead. """ pos = self.idd_file_stream.tell() c = self.idd_file_stream.read(1) if c == "": c = None self.idd_file_stream.seek(pos) return c
[docs] def read_one_char(self): """ Internal worker function that reads a single character from the internal IDD stream, advancing the cursor. :return: A single character, the one immediately following the cursor, or None if it can't read. """ c = self.idd_file_stream.read(1) if c == "": c = None return c
[docs] def process_file(self): """ Internal worker function that reads the IDD stream, whether it was constructed from a file path, stream or string. This state machine worker moves character by character reading tokens and processing them into a meaningful IDD structure. :return: An IDD structure describing the IDD contents :raises ProcessingException: for any erroneous conditions encountered during processing """ # flags and miscellaneous variables line_index = 1 # 1-based counter for the current line of the file last_field_for_object = False # this will be the last field if a semicolon is encountered magic_cache_key = None # variables used as we are building the input structure self.idd = IDDStructure(self.file_path) # empty overall IDD structure cur_group = None # temporary placeholder for an IDD group cur_object = None # temporary placeholder for an IDD object cur_field = None # temporary placeholder for an IDD field cur_obj_meta_data_type = None # temporary placeholder for the type of object metadata encountered # variables related to building and processing tokens token_builder = "" # state machine variables read_status = CurrentReadType.ReadAnything # current state machine reading status revert_status_after_comment = None # reading status before the comment, shift back to this after comment's done # loop continuously, the loop will exit when it is done while True: # update the next character just_read_char = self.read_one_char() if not just_read_char: break # update the peeked character peeked_char = self.peek_one_char() if not peeked_char: peeked_char = "\n" # to simulate that the line ended # jump if we are at an EOL if just_read_char == "\n": # increment the counter line_index += 1 # if we aren't already processing a comment, and we have a comment: # don't append to the token builder, just set read status if read_status != CurrentReadType.EncounteredComment_ReadToCR: if just_read_char == "!": if read_status != CurrentReadType.ReadingFieldMetaData: read_status = CurrentReadType.EncounteredComment_ReadToCR else: token_builder += just_read_char # clear a preceding line feed character from the token if just_read_char == "\n" and len(token_builder) == 1: token_builder = "" if read_status == CurrentReadType.ReadAnything: # this is the most general case where we are wandering through the IDD looking for whatever # the possibilities are: comments, group declaration, or object definition if peeked_char == "\\": # starting a group name read_status = CurrentReadType.ReadingGroupDeclaration elif peeked_char in [" ", "\n", "\t"]: # don't do anything pass elif peeked_char == "!": revert_status_after_comment = read_status read_status = CurrentReadType.EncounteredComment_ReadToCR else: # should be alphanumeric, just start reading object name read_status = CurrentReadType.ReadingObjectName elif read_status == CurrentReadType.ReadingGroupDeclaration: # for the group declarations, we will just check to see if the # line has ended since it should be on a single line # if it hasn't then just keep on as is, if it has, parse the group name out of it if peeked_char == "\n": # first update the previous group if cur_group is not None: self.idd.groups.append(cur_group) group_declaration = token_builder group_flag_index = group_declaration.find(self.group_flag_string) if group_flag_index == -1: # pragma: no cover # add error to error report raise exceptions.ProcessingException( "Group keyword not found where expected", line_index=line_index) else: group_declaration = group_declaration[len(self.group_flag_string):] cur_group = IDDGroup(group_declaration.strip()) token_builder = "" read_status = CurrentReadType.ReadAnything # to start looking for groups/objects/comments/whatever elif read_status == CurrentReadType.ReadingObjectName: # the object names could have several aspects # they could be a single line object, such as: "Lead Input;" # they could be the title of a multi field object, such as: "Version," # and they could of course have comments at the end # for now I will assume that the single line objects can't have metadata # so read until either a comma or semicolon, also trap for errors if we reach the end of line or comment if peeked_char == ",": object_title = token_builder cur_object = IDDObject(object_title) token_builder = "" self.read_one_char() # to clear the comma read_status = CurrentReadType.LookingForObjectMetaDataOrNextField elif peeked_char == ";": # since this whole object is a single line, we can just add it directly to the current group object_title = token_builder # this is added to singleline objects because CurGroup isn't instantiated yet, should fix self.idd.single_line_objects.append(object_title.strip()) token_builder = "" # to clear the builder self.read_one_char() # to clear the semicolon read_status = CurrentReadType.ReadAnything elif peeked_char in ["\n", "!"]: # pragma: no cover raise exceptions.ProcessingException( "An object name was not properly terminated by a comma or semicolon", line_index=line_index) elif read_status == CurrentReadType.LookingForObjectMetaDataOrNextField: token_builder = "" if peeked_char == "\\": read_status = CurrentReadType.ReadingObjectMetaData elif peeked_char in ["A", "N"]: read_status = CurrentReadType.ReadingFieldANValue elif peeked_char == "!": revert_status_after_comment = read_status read_status = CurrentReadType.EncounteredComment_ReadToCR elif peeked_char == " ": # just let it keep reading pass elif peeked_char == "\n": # just let it keep reading pass elif read_status == CurrentReadType.ReadingObjectMetaData: if peeked_char in [" ", ":", "\n"]: if token_builder in self.obj_flags: cur_obj_meta_data_type = token_builder token_builder = "" if cur_obj_meta_data_type in ["\\obselete", "\\required-object", "\\unique-object"]: # these do not carry further data, stop reading now if cur_obj_meta_data_type not in cur_object.meta_data: string_list = [None] cur_object.meta_data[cur_obj_meta_data_type] = string_list else: # pragma: no cover -- strings already exist, this is not valid... raise exceptions.ProcessingException( "Erroneous object meta data - repeated \"" + token_builder + "\"", line_index=line_index, object_name=cur_object.name) cur_obj_meta_data_type = None read_status = CurrentReadType.LookingForObjectMetaDataOrNextField else: # these will have following data, just set the flag read_status = CurrentReadType.ReadingObjectMetaDataContents else: # pragma: no cover # token_builder = "" raise exceptions.ProcessingException( "Erroneous object meta data tag found", line_index=line_index, object_name=cur_object.name) else: # just keep reading pass elif read_status == CurrentReadType.ReadingObjectMetaDataContents: if peeked_char == "\n": data = token_builder.strip() # quick validation of some meta data if cur_obj_meta_data_type == "\\min-fields": try: float(data) except ValueError: raise exceptions.ProcessingException( "Erroneous meta data for min-fields, non-numeric number of fields? Weird...", line_index=line_index, object_name=cur_object.name ) if cur_obj_meta_data_type not in cur_object.meta_data: string_list = [data] cur_object.meta_data[cur_obj_meta_data_type] = string_list else: string_list = cur_object.meta_data[cur_obj_meta_data_type] string_list.append(data) cur_object.meta_data[cur_obj_meta_data_type] = string_list token_builder = "" cur_obj_meta_data_type = None read_status = CurrentReadType.LookingForObjectMetaDataOrNextField elif read_status == CurrentReadType.ReadingFieldANValue: if peeked_char in [",", ";"]: cur_field = IDDField(token_builder.strip()) token_builder = "" if peeked_char == ",": last_field_for_object = False elif peeked_char == ";": last_field_for_object = True read_status = CurrentReadType.ReadingFieldMetaDataOrNextANValue elif peeked_char == "\n": # pragma: no cover raise exceptions.ProcessingException( "Blank or erroneous ""AN"" field index value", line_index=line_index, object_name=cur_object.name) elif read_status == CurrentReadType.ReadingFieldMetaDataOrNextANValue: if peeked_char == "\\": token_builder = "" read_status = CurrentReadType.ReadingFieldMetaData elif peeked_char in ["A", "N"]: token_builder = "" # this is hit when we have an AN value right after a previous AN value, so no meta data is added if cur_field.field_name is None: cur_field.field_name = "" cur_object.fields.append(cur_field) read_status = CurrentReadType.ReadingFieldANValue elif read_status == CurrentReadType.ReadingFieldMetaData: if peeked_char == "\n": # for this one, let's read all the way to the end of the line, then parse data flag_found = next((x for x in self.field_flags if x in token_builder), None) if flag_found: data = token_builder[len(flag_found):] # data needs to start with a space, otherwise things like: \fieldd My Field would be valid if len(data) > 0: if data[0] not in [" ", ">", "<"]: raise exceptions.ProcessingException( "Invalid meta data, expected a space after the meta data specifier before the data", line_index=line_index, object_name=cur_object.name, field_name=cur_field.field_name ) data = data.strip() if flag_found == "\\field": cur_field.field_name = data else: if flag_found not in cur_field.meta_data: string_list = [data] cur_field.meta_data[flag_found] = string_list else: string_list = cur_field.meta_data[flag_found] string_list.append(data) cur_field.meta_data[flag_found] = string_list else: # pragma: no cover raise exceptions.ProcessingException( "Erroneous field meta data entry found", line_index=line_index, object_name=cur_object.name, field_name=cur_field.field_name) token_builder = "" if last_field_for_object: read_status = CurrentReadType.LookingForFieldMetaDataOrNextObject else: read_status = CurrentReadType.LookingForFieldMetaDataOrNextField else: # just keep reading pass elif read_status == CurrentReadType.LookingForFieldMetaDataOrNextField: if peeked_char in ["A", "N"]: token_builder = "" cur_object.fields.append(cur_field) read_status = CurrentReadType.ReadingFieldANValue elif peeked_char == "\\": token_builder = "" read_status = CurrentReadType.ReadingFieldMetaData elif peeked_char == "!": revert_status_after_comment = read_status read_status = CurrentReadType.EncounteredComment_ReadToCR elif peeked_char == "\n": # just let it keep reading pass elif read_status == CurrentReadType.LookingForFieldMetaDataOrNextObject: if peeked_char == "\\": token_builder = "" read_status = CurrentReadType.ReadingFieldMetaData elif peeked_char == "\n": # blank line will mean we are concluding this object token_builder = "" cur_object.fields.append(cur_field) cur_group.objects.append(cur_object) read_status = CurrentReadType.ReadAnything elif read_status == CurrentReadType.EncounteredComment_ReadToCR: # set the flag for reading the next line if necessary token_builder += just_read_char if peeked_char == "\n": if revert_status_after_comment is not None: read_status = revert_status_after_comment revert_status_after_comment = None else: read_status = CurrentReadType.ReadAnything if "IDD_Version" in token_builder: self.idd.version_string = token_builder.strip().split(" ")[1].strip() try: version_tokens = self.idd.version_string.split(".") tmp_string = "{}.{}".format(version_tokens[0], version_tokens[1]) self.idd.version_float = float(tmp_string) except ValueError: raise exceptions.ProcessingException( "Found IDD version, but could not coerce into floating point representation") elif "IDD_BUILD" in token_builder: self.idd.build_string = token_builder.strip().split(" ")[1].strip() magic_cache_key = "{}__{}".format(self.idd.version_string, self.idd.build_string) module_logger.debug("Encountered IDD_BUILD, checking cache for key {}".format(magic_cache_key)) if magic_cache_key in IDD_CACHE: module_logger.debug("Found this IDD cache key in the cache, using existing entry") self.idd = IDD_CACHE[magic_cache_key] return self.idd token_builder = "" # end the file here, but should watch for end-of-file in other CASEs also self.idd.groups.append(cur_group) # we should assert that we have version and build strings, even in testing if (not self.idd.version_float) or (not self.idd.build_string): raise exceptions.ProcessingException("IDD did not appear to include standard version headers") # save this idd structure in the cache if magic_cache_key: IDD_CACHE[magic_cache_key] = self.idd module_logger.debug("Storing this IDD in cache with key: {}".format(magic_cache_key)) # and return the magically useful IDDStructure instance return self.idd