Skip to content
Snippets Groups Projects
Select Git revision
  • a2b208352aee495c5fd756d06b71d8c6f51346e4
  • master default protected
2 results

freqProcessor.py

Blame
  • freqProcessor.py 9.82 KiB
    #! /usr/bin/python
    
    import sys
    import traceback
    from org.apache.nifi.processor import AbstractProcessor
    from org.apache.nifi.processor import Relationship
    from org.apache.nifi.components import PropertyDescriptor
    from org.apache.nifi.processor.util import StandardValidators
    from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory, SimpleRecordSchema
    from org.apache.nifi.serialization.record import RecordField, RecordFieldType
    
    import os
    
    from freq import *
    
    class FreqProcessor(AbstractProcessor):
        # Relationship descriptors
        _rel_success = Relationship.Builder().description("Success").name("success").build()
        _rel_failure = Relationship.Builder().description("Failure").name("failure").build()
    
        # Property descriptors
        # Record reader and writer services to use
        _record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build()
        _record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build()
        # Record field to get the string to be analyzed from
        _input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
        # Record fields to store the results (prob1, prob2) into
        _result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
        _result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
        # File with character frequency table (as created by freq.py)
        _freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(True).build()
    
        def __init__(self):
            self.fc = FreqCounter()
            self.fc_loaded = False
        
        def initialize(self, context):
            self.logger = context.getLogger()
    
        def setLogger(self, logger):
            self.logger = logger
    
        def getRelationships(self):
            return set([self._rel_success, self._rel_failure])
    
        def validate(self, context):
            pass
    
        def getPropertyDescriptors(self):
            return [self._record_reader, self._record_writer,
                    self._input_field, self._result_field1, self._result_field2,
                    self._freq_file]
    
        def onPropertyModified(self, descriptor, newValue, oldValue):
            pass
    
        def onTrigger(self, context, sessionFactory):
            session = sessionFactory.createSession()
            if not self.fc_loaded:
                # Load character frequency table from given file
                # (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method)
                self.fc.load(str(context.getProperty("Frequency File").getValue()))
                self.fc_loaded=True
                self.logger.info("Sucessfully loaded frequency file")
    
            # Get field names to work with (set in configuration)
            self.input_field_name = context.getProperty("Input Field").getValue()[1:]
            self.result1_field_name = context.getProperty("Result Field 1").getValue()[1:]
            self.result2_field_name = context.getProperty("Result Field 2").getValue()[1:]
    
            try:
                flowfile = session.get()
                if flowfile is None :
                    return
    
                self.logger.debug("Processing FlowFile {}".format(flowfile.getAttribute('uuid')))
    
                readerFactory = context.getProperty(self._record_reader).asControllerService(RecordReaderFactory)
                writerFactory = context.getProperty(self._record_writer).asControllerService(RecordSetWriterFactory)
                originalAttributes = flowfile.attributes
                attributes = {}
                recordCount = 0
                
                input_stream = session.read(flowfile)
                reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger)
                
                # Create a new FlowFile to write the result to
                # (create() method copies the attributes from the original one, but not the content)
                # TODO: This results in FORK and DROP events in Data Provenance log.
                #       I would like to get CONTENT MODIFIED log (as UpdateRecord
                #       processor does) but can't find a way to do it.
                #       I can't open InputStream and OutputStream at the same time 
                #       on single flowfile - that's why I need to make a new one.
                #       It probably should be done by InputOutputStream, but it 
                #       seems there's no way in Python to get it (in Java it uses
                #       callbacks and method overloading, but overloading doesn't 
                #       work in Python, only one of the Java methods is available
                #       in Python).
                newflowfile = session.create(flowfile)
                output_stream = session.write(newflowfile)
                
                try:
                    # Get the first record and process it before we create the Record Writer. 
                    # We do this so that if the Processor updates the Record's schema, we can provide 
                    # an updated schema to the Record Writer. If there are no records, then we can
                    # simply create the Writer with the Reader's schema and begin & end the RecordSet
                    record = reader.nextRecord()
                    #self.logger.debug("Record read (first in batch): " + record.encode('utf-8'))
                    if not record:
                        # There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet
                        self.logger.debug("FlowFile with no record")
                        writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
                        writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
                        try:
                            writer.beginRecordSet()
                            writeResult = writer.finishRecordSet()
                            attributes['record.count'] = str(writeResult.recordCount)
                            attributes.update(writeResult.attributes)
                            recordCount = writeResult.recordCount
                        finally:
                            writer.close()
                    else:
                        # process first record
                        self.process_record(record)
                        
                        # There are some records to process ...
                        # Add new fields to the schema
                        oldSchema = writerFactory.getSchema(originalAttributes, record.schema)
                        fields = list(oldSchema.getFields())
                        fields.append(RecordField(self.result1_field_name, RecordFieldType.FLOAT.getDataType()))
                        fields.append(RecordField(self.result2_field_name, RecordFieldType.FLOAT.getDataType()))
                        newSchema = SimpleRecordSchema(fields)
                        writer = writerFactory.createWriter(self.logger, newSchema, output_stream)
                        try:
                            writer.beginRecordSet()
                            writer.write(record)
                            record = reader.nextRecord()
                            while record:
                                #self.logger.debug("Record read: " + unicode(record, encoding='utf-8'))
                                
                                # process record
                                self.process_record(record)
                                
                                writer.write(record)
                                
                                record = reader.nextRecord()
            
                            writeResult = writer.finishRecordSet()
                            attributes['record.count'] = str(writeResult.recordCount)
                            attributes.update(writeResult.attributes)
                            recordCount = writeResult.recordCount
                        finally:
                            writer.close()
                finally:
                    reader.close()
                    input_stream.close()
                    output_stream.close()
            except Exception as e:
                # Failure - print error info and route to 'failure'
                exc_type, exc_obj, tb = sys.exc_info()
                self.logger.error("Failed to process FlowFile {}; will route to failure. Exception in '{}' at line {}: {}".format(flowfile.getAttribute('uuid'), tb.tb_frame.f_code.co_filename, tb.tb_lineno, str(e)))
                session.remove(newflowfile)
                session.transfer(flowfile, self._rel_failure)
                session.commit()
                return
    
            # Success
            newflowfile = session.putAllAttributes(newflowfile, attributes)
            session.remove(flowfile)
            session.transfer(newflowfile, self._rel_success)
            session.adjustCounter('Records Processed', recordCount, False)
            session.commit()
            self.logger.debug('Successfully processed {} records in FlowFile {}'.format(recordCount, flowfile.getAttribute('uuid')))
    
    
        def process_record(self, record):
            """
            Processing of individual record - domain probability computation
            
            Get domain form record, compute probability using the FreqCounter,
            and store the resulting probabilities to the record.
            """
            text = record.getValue(self.input_field_name)
            if text is None:
                raise ValueError("Can't get value of '{}' field".format(self.input_field_name))
    
            prob1, prob2 = self.fc.probability(text)
            
            record.setValue(self.result1_field_name, prob1)
            record.setValue(self.result2_field_name, prob2)
    
    
    processor = FreqProcessor()