Skip to content
Snippets Groups Projects
freqProcessor.py 8.91 KiB
#! /usr/bin/python

import sys
import traceback
from org.apache.nifi.processor import AbstractProcessor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators
from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory
import os

from freq import *

class FreqProcessor(AbstractProcessor):
    # Relationship descriptors
    _rel_success = Relationship.Builder().description("Success").name("success").build()
    _rel_failure = Relationship.Builder().description("Failure").name("failure").build()

    # Property descriptors
    # Record reader and writer services to use
    _record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build()
    _record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build()
    # Record field to get the string to be analyzed from
    _input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
    # Record fields to store the results (prob1, prob2) into
    _result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
    _result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
    # File with character frequency table (as created by freq.py)
    _freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(True).build()

    def __init__(self):
        self.fc = FreqCounter()
        self.fc_loaded = False
    
    def initialize(self, context):
        self.logger = context.getLogger()

    def setLogger(self, logger):
        self.logger = logger

    def getRelationships(self):
        return set([self._rel_success, self._rel_failure])

    def validate(self, context):
        pass

    def getPropertyDescriptors(self):
        return [self._record_reader, self._record_writer,
                self._input_field, self._result_field1, self._result_field2,
                self._freq_file]

    def onPropertyModified(self, descriptor, newValue, oldValue):
        pass

    def onTrigger(self, context, sessionFactory):
        session = sessionFactory.createSession()
        if not self.fc_loaded:
            # Load character frequency table from given file
            # (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method)
            self.fc.load(str(context.getProperty("Frequency File").getValue()))
            self.fc_loaded=True
            self.logger.info("Sucessfully loaded frequency file")

        # Get field names to work with (set in configuration)
        self.input_field_name = context.getProperty("Input Field").getValue()
        self.result1_field_name = context.getProperty("Result Field 1").getValue()
        self.result2_field_name = context.getProperty("Result Field 2").getValue()

        try:
            flowfile = session.get()
            if flowfile is None :
                return

            self.logger.debug("Processing FlowFile {}".format(flowfile.getAttribute('uuid')))

            readerFactory = context.getProperty(self._record_reader).asControllerService(RecordReaderFactory)
            writerFactory = context.getProperty(self._record_writer).asControllerService(RecordSetWriterFactory)
            originalAttributes = flowfile.attributes
            attributes = {}
            recordCount = 0
            
            input_stream = session.read(flowfile)
            reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger)
            
            # Create a new FlowFile to write the result to
            # (create() method copies the attributes from the original one, but not the content)
            # TODO: This results in FORK and DROP events in Data Provenance log.
            #       I would like to get CONTENT MODIFIED log (as UpdateRecord
            #       processor does) but can't find a way to do it.
            #       I can't open InputStream and OutputStream at the same time 
            #       on single flowfile - that's why I need to make a new one.
            #       It probably should be done by InputOutputStream, but it 
            #       seems there's no way in Python to get it (in Java it uses
            #       callbacks and method overloading, but overloading doesn't 
            #       work in Python, only one of the Java methods is available
            #       in Python).
            newflowfile = session.create(flowfile)
            output_stream = session.write(newflowfile)
            
            try:
                # Get the first record and process it before we create the Record Writer. 
                # We do this so that if the Processor updates the Record's schema, we can provide 
                # an updated schema to the Record Writer. If there are no records, then we can
                # simply create the Writer with the Reader's schema and begin & end the RecordSet
                record = reader.nextRecord()
                self.logger.debug("Record read (first in batch): " + str(record))
                if not record:
                    # There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet
                    self.logger.debug("FlowFile with no record")
                    writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
                    writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
                    try:
                        writer.beginRecordSet()
                        writeResult = writer.finishRecordSet()
                        attributes['record.count'] = str(writeResult.recordCount)
                        attributes.update(writeResult.attributes)
                        recordCount = writeResult.recordCount
                    finally:
                        writer.close()
                else:
                    # process first record
                    self.process_record(record)
                    
                    # There are some records to process ...
                    writeSchema = writerFactory.getSchema(originalAttributes, record.schema)
                    writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
                    try:
                        writer.beginRecordSet()
                        writer.write(record)
                        record = reader.nextRecord()
                        while record:
                            self.logger.debug("Record read: " + str(record))
                            
                            # process record
                            self.process_record(record)
                            
                            writer.write(record)
                            
                            record = reader.nextRecord()
        
                        writeResult = writer.finishRecordSet()
                        attributes['record.count'] = str(writeResult.recordCount)
                        attributes.update(writeResult.attributes)
                        recordCount = writeResult.recordCount
                    finally:
                        writer.close()
            finally:
                reader.close()
        except Exception as e:
            # Failure
            self.logger.error("Failed to process {}; will route to failure. {}".format(flowfile.getAttribute('uuid'), str(e)))
            session.transfer(flowfile, self._rel_failure)
            session.commit()
            return

        # Success
        newflowfile = session.putAllAttributes(newflowfile, attributes)
        session.remove(flowfile)
        session.transfer(newflowfile, self._rel_success)
        session.adjustCounter('Records Processed', recordCount, False)
        session.commit()
        self.logger.debug('Successfully processed {} records in FlowFile {}'.format(recordCount, flowfile.getAttribute('uuid')))


    def process_record(self, record):
        """
        Processing of individual record - domain probability computation
        
        Get domain form record, compute probability using the FreqCounter,
        and store the resulting probabilities to the record.
        """
        text = record.getValue(self.input_field_name)
        
        prob1, prob2 = self.fc.probability(text)
        
        record.setValue(self.result1_field_name, prob1)
        record.setValue(self.result2_field_name, prob2)


processor = FreqProcessor()