Something went wrong on our end
-
Václav Bartoš authoredVáclav Bartoš authored
freqProcessor.py 8.91 KiB
#! /usr/bin/python
import sys
import traceback
from org.apache.nifi.processor import AbstractProcessor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators
from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory
import os
from freq import *
class FreqProcessor(AbstractProcessor):
# Relationship descriptors
_rel_success = Relationship.Builder().description("Success").name("success").build()
_rel_failure = Relationship.Builder().description("Failure").name("failure").build()
# Property descriptors
# Record reader and writer services to use
_record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build()
_record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build()
# Record field to get the string to be analyzed from
_input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
# Record fields to store the results (prob1, prob2) into
_result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
_result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build()
# File with character frequency table (as created by freq.py)
_freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(True).build()
def __init__(self):
self.fc = FreqCounter()
self.fc_loaded = False
def initialize(self, context):
self.logger = context.getLogger()
def setLogger(self, logger):
self.logger = logger
def getRelationships(self):
return set([self._rel_success, self._rel_failure])
def validate(self, context):
pass
def getPropertyDescriptors(self):
return [self._record_reader, self._record_writer,
self._input_field, self._result_field1, self._result_field2,
self._freq_file]
def onPropertyModified(self, descriptor, newValue, oldValue):
pass
def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession()
if not self.fc_loaded:
# Load character frequency table from given file
# (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method)
self.fc.load(str(context.getProperty("Frequency File").getValue()))
self.fc_loaded=True
self.logger.info("Sucessfully loaded frequency file")
# Get field names to work with (set in configuration)
self.input_field_name = context.getProperty("Input Field").getValue()
self.result1_field_name = context.getProperty("Result Field 1").getValue()
self.result2_field_name = context.getProperty("Result Field 2").getValue()
try:
flowfile = session.get()
if flowfile is None :
return
self.logger.debug("Processing FlowFile {}".format(flowfile.getAttribute('uuid')))
readerFactory = context.getProperty(self._record_reader).asControllerService(RecordReaderFactory)
writerFactory = context.getProperty(self._record_writer).asControllerService(RecordSetWriterFactory)
originalAttributes = flowfile.attributes
attributes = {}
recordCount = 0
input_stream = session.read(flowfile)
reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger)
# Create a new FlowFile to write the result to
# (create() method copies the attributes from the original one, but not the content)
# TODO: This results in FORK and DROP events in Data Provenance log.
# I would like to get CONTENT MODIFIED log (as UpdateRecord
# processor does) but can't find a way to do it.
# I can't open InputStream and OutputStream at the same time
# on single flowfile - that's why I need to make a new one.
# It probably should be done by InputOutputStream, but it
# seems there's no way in Python to get it (in Java it uses
# callbacks and method overloading, but overloading doesn't
# work in Python, only one of the Java methods is available
# in Python).
newflowfile = session.create(flowfile)
output_stream = session.write(newflowfile)
try:
# Get the first record and process it before we create the Record Writer.
# We do this so that if the Processor updates the Record's schema, we can provide
# an updated schema to the Record Writer. If there are no records, then we can
# simply create the Writer with the Reader's schema and begin & end the RecordSet
record = reader.nextRecord()
self.logger.debug("Record read (first in batch): " + str(record))
if not record:
# There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet
self.logger.debug("FlowFile with no record")
writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
try:
writer.beginRecordSet()
writeResult = writer.finishRecordSet()
attributes['record.count'] = str(writeResult.recordCount)
attributes.update(writeResult.attributes)
recordCount = writeResult.recordCount
finally:
writer.close()
else:
# process first record
self.process_record(record)
# There are some records to process ...
writeSchema = writerFactory.getSchema(originalAttributes, record.schema)
writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
try:
writer.beginRecordSet()
writer.write(record)
record = reader.nextRecord()
while record:
self.logger.debug("Record read: " + str(record))
# process record
self.process_record(record)
writer.write(record)
record = reader.nextRecord()
writeResult = writer.finishRecordSet()
attributes['record.count'] = str(writeResult.recordCount)
attributes.update(writeResult.attributes)
recordCount = writeResult.recordCount
finally:
writer.close()
finally:
reader.close()
except Exception as e:
# Failure
self.logger.error("Failed to process {}; will route to failure. {}".format(flowfile.getAttribute('uuid'), str(e)))
session.transfer(flowfile, self._rel_failure)
session.commit()
return
# Success
newflowfile = session.putAllAttributes(newflowfile, attributes)
session.remove(flowfile)
session.transfer(newflowfile, self._rel_success)
session.adjustCounter('Records Processed', recordCount, False)
session.commit()
self.logger.debug('Successfully processed {} records in FlowFile {}'.format(recordCount, flowfile.getAttribute('uuid')))
def process_record(self, record):
"""
Processing of individual record - domain probability computation
Get domain form record, compute probability using the FreqCounter,
and store the resulting probabilities to the record.
"""
text = record.getValue(self.input_field_name)
prob1, prob2 = self.fc.probability(text)
record.setValue(self.result1_field_name, prob1)
record.setValue(self.result2_field_name, prob2)
processor = FreqProcessor()