Skip to content
Snippets Groups Projects
Commit 0dac0b06 authored by Václav Bartoš's avatar Václav Bartoš
Browse files

FreqProcessor rewritten to support Record Processing

parent 6f2602ea
No related branches found
No related tags found
No related merge requests found
...@@ -2,64 +2,179 @@ ...@@ -2,64 +2,179 @@
import sys import sys
import traceback import traceback
from org.apache.nifi.processor import Processor from org.apache.nifi.processor import AbstractProcessor
from org.apache.nifi.processor import Relationship from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators from org.apache.nifi.processor.util import StandardValidators
from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory
import os import os
from freq import * from freq import *
class FreqProcessor(Processor) : class FreqProcessor(AbstractProcessor):
__rel_success = Relationship.Builder().description("Success").name("success").build() # Relationship descriptors
_rel_success = Relationship.Builder().description("Success").name("success").build()
_rel_failure = Relationship.Builder().description("Failure").name("failure").build()
# Property descriptors
# Record reader and writer services to use
_record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build()
_record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build()
# Record field to get the string to be analyzed from
_input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
# Record fields to store the results (prob1, prob2) into
_result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
_result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
# File with character frequency table (as created by freq.py)
_freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(True).build()
def __init__(self): def __init__(self):
self.fc = FreqCounter() self.fc = FreqCounter()
self.init=True self.fc_loaded = False
pass
def initialize(self, context): def initialize(self, context):
pass self.logger = context.getLogger()
def setLogger(self, logger):
self.logger = logger
def getRelationships(self): def getRelationships(self):
return set([self.__rel_success]) return set([self._rel_success, self._rel_failure])
def validate(self, context): def validate(self, context):
pass pass
def getPropertyDescriptors(self): def getPropertyDescriptors(self):
result1 = PropertyDescriptor.Builder().name("resultAttr1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build() return [self._record_reader, self._record_writer,
result2 = PropertyDescriptor.Builder().name("resultAttr2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build() self._input_field, self._result_field1, self._result_field2,
inputAttr = PropertyDescriptor.Builder().name("inputAttr").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build() self._freq_file]
freqFile = PropertyDescriptor.Builder().name("freqFile").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(True).build()
return [result1, result2 ,inputAttr,freqFile]
def onPropertyModified(self, descriptor, newValue, oldValue): def onPropertyModified(self, descriptor, newValue, oldValue):
pass pass
def onTrigger(self, context, sessionFactory): def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession() session = sessionFactory.createSession()
if self.init: if not self.fc_loaded:
self.fc.load(str(context.getProperty("freqFile").getValue())) # Load character frequency table from given file
self.init=False # (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method)
self.fc.load(str(context.getProperty("Frequency File").getValue()))
self.fc_loaded=True
self.logger.info("Sucessfully loaded frequency file")
# Get field names to work with (set in configuration)
self.input_field_name = context.getProperty("Input Field").getValue()
self.result1_field_name = context.getProperty("Result Field 1").getValue()
self.result2_field_name = context.getProperty("Result Field 2").getValue()
try: try:
flowfile = session.get() flowfile = session.get()
if flowfile is None : if flowfile is None :
return return
inputAttr=str(context.getProperty("inputAttr").getValue()) self.logger.debug("Processing FlowFile {}".format(flowfile.getAttribute('uuid')))
result=self.fc.probability(flowfile.getAttribute(inputAttr))
flowfile = session.putAttribute(flowfile, context.getProperty("resultAttr1").getValue(), str(result[0]))
flowfile = session.putAttribute(flowfile, context.getProperty("resultAttr2").getValue(), str(result[1]))
session.transfer(flowfile, self.__rel_success) readerFactory = context.getProperty(self._record_reader).asControllerService(RecordReaderFactory)
writerFactory = context.getProperty(self._record_writer).asControllerService(RecordSetWriterFactory)
originalAttributes = flowfile.attributes
attributes = {}
recordCount = 0
input_stream = session.read(flowfile)
reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger)
# Create a new FlowFile to write the result to
# (create() method copies the attributes from the original one, but not the content)
# TODO: This results in FORK and DROP events in Data Provenance log.
# I would like to get CONTENT MODIFIED log (as UpdateRecord
# processor does) but can't find a way to do it.
# I can't open InputStream and OutputStream at the same time
# on single flowfile - that's why I need to make a new one.
# It probably should be done by InputOutputStream, but it
# seems there's no way in Python to get it (in Java it uses
# callbacks and method overloading, but overloading doesn't
# work in Python, only one of the Java methods is available
# in Python).
newflowfile = session.create(flowfile)
output_stream = session.write(newflowfile)
try:
# Get the first record and process it before we create the Record Writer.
# We do this so that if the Processor updates the Record's schema, we can provide
# an updated schema to the Record Writer. If there are no records, then we can
# simply create the Writer with the Reader's schema and begin & end the RecordSet
record = reader.nextRecord()
self.logger.debug("Record read (first in batch): " + str(record))
if not record:
# There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet
self.logger.debug("FlowFile with no record")
writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
try:
writer.beginRecordSet()
writeResult = writer.finishRecordSet()
attributes['record.count'] = str(writeResult.recordCount)
attributes.update(writeResult.attributes)
recordCount = writeResult.recordCount
finally:
writer.close()
else:
# process first record
self.process_record(record)
# There are some records to process ...
writeSchema = writerFactory.getSchema(originalAttributes, record.schema)
writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
try:
writer.beginRecordSet()
writer.write(record)
record = reader.nextRecord()
while record:
self.logger.debug("Record read: " + str(record))
# process record
self.process_record(record)
writer.write(record)
record = reader.nextRecord()
writeResult = writer.finishRecordSet()
attributes['record.count'] = str(writeResult.recordCount)
attributes.update(writeResult.attributes)
recordCount = writeResult.recordCount
finally:
writer.close()
finally:
reader.close()
except Exception as e:
# Failure
self.logger.error("Failed to process {}; will route to failure. {}".format(flowfile.getAttribute('uuid'), str(e)))
session.transfer(flowfile, self._rel_failure)
session.commit()
return
# Success
newflowfile = session.putAllAttributes(newflowfile, attributes)
session.remove(flowfile)
session.transfer(newflowfile, self._rel_success)
session.adjustCounter('Records Processed', recordCount, False)
session.commit() session.commit()
except : self.logger.debug('Successfully processed {} records in FlowFile {}'.format(recordCount, flowfile.getAttribute('uuid')))
print sys.exc_info()[0]
print "Exception in FreqProcessor:"
print '-' * 60 def process_record(self, record):
traceback.print_exc(file=sys.stdout) """
print '-' * 60 Processing of individual record - domain probability computation
session.rollback(True)
raise Get domain form record, compute probability using the FreqCounter,
and store the resulting probabilities to the record.
"""
text = record.getValue(self.input_field_name)
prob1, prob2 = self.fc.probability(text)
record.setValue(self.result1_field_name, prob1)
record.setValue(self.result2_field_name, prob2)
processor = FreqProcessor() processor = FreqProcessor()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment