diff --git a/scripts/freq/freqProcessor.py b/scripts/freq/freqProcessor.py index 13599ad67ecd91995babe27cb2c32edb997d2b16..047e9e4635fd4e7ca87a96b0ab2f16021103ccf5 100755 --- a/scripts/freq/freqProcessor.py +++ b/scripts/freq/freqProcessor.py @@ -1,4 +1,5 @@ #! /usr/bin/python +# This is a version for NiFi v.1.11.3 import sys import traceback @@ -55,6 +56,9 @@ class FreqProcessor(AbstractProcessor): def onPropertyModified(self, descriptor, newValue, oldValue): pass + def onStopped(self, context): + pass + def onTrigger(self, context, sessionFactory): session = sessionFactory.createSession() if not self.fc_loaded: @@ -62,7 +66,7 @@ class FreqProcessor(AbstractProcessor): # (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method) self.fc.load(str(context.getProperty("Frequency File").evaluateAttributeExpressions().getValue())) self.fc_loaded=True - self.logger.info("Sucessfully loaded frequency file") + self.logger.debug("Sucessfully loaded frequency file") # Get field names to work with (set in configuration) # (remove leading '/' if present to simulate trivial support of RecordPath) @@ -84,7 +88,8 @@ class FreqProcessor(AbstractProcessor): recordCount = 0 input_stream = session.read(flowfile) - reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger) + # ref: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java + reader = readerFactory.createRecordReader(originalAttributes, input_stream, -1, self.logger) # Create a new FlowFile to write the result to # (create() method copies the attributes from the original one, but not the content) @@ -98,6 +103,9 @@ class FreqProcessor(AbstractProcessor): # callbacks and method overloading, but overloading doesn't # work in Python, only one of the Java methods is available # in Python). + # Later note: That's not true, overloading should be resolved + # automaticatlly by the number of parameters and their types. + # -> try it again newflowfile = session.create(flowfile) output_stream = session.write(newflowfile) @@ -107,12 +115,13 @@ class FreqProcessor(AbstractProcessor): # an updated schema to the Record Writer. If there are no records, then we can # simply create the Writer with the Reader's schema and begin & end the RecordSet record = reader.nextRecord() - #self.logger.debug("Record read (first in batch): " + record.encode('utf-8')) + #self.logger.debug("Record read (first in batch): " + unicode(record, encoding='utf-8')) if not record: # There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet self.logger.debug("FlowFile with no record") writeSchema = writerFactory.getSchema(originalAttributes, reader.schema) - writer = writerFactory.createWriter(self.logger, writeSchema, output_stream) + # ref: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java + writer = writerFactory.createWriter(self.logger, writeSchema, output_stream, None) try: writer.beginRecordSet() writeResult = writer.finishRecordSet() @@ -127,15 +136,25 @@ class FreqProcessor(AbstractProcessor): # There are some records to process ... # Add new fields to the schema + #self.logger.debug("origAttributes: " + str(originalAttributes)) oldSchema = writerFactory.getSchema(originalAttributes, record.schema) - fields = list(oldSchema.getFields()) - fields.append(RecordField(self.result1_field_name, RecordFieldType.FLOAT.getDataType())) - fields.append(RecordField(self.result2_field_name, RecordFieldType.FLOAT.getDataType())) - newSchema = SimpleRecordSchema(fields) - writer = writerFactory.createWriter(self.logger, newSchema, output_stream) + fields = oldSchema.getFields() + field_names = [f.getFieldName() for f in fields] + # ref: https://github.com/apache/nifi/blob/master/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java + if self.result1_field_name not in field_names: + fields.append(RecordField(self.result1_field_name, RecordFieldType.FLOAT.getDataType(), False)) + if self.result2_field_name not in field_names: + fields.append(RecordField(self.result2_field_name, RecordFieldType.FLOAT.getDataType(), False)) + newSchema = SimpleRecordSchema(fields, oldSchema.getIdentifier()) + + # Create writer + # ref: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java + writer = writerFactory.createWriter(self.logger, newSchema, output_stream, None) try: + # write record; process and write all further records in a loop... writer.beginRecordSet() writer.write(record) + record = reader.nextRecord() while record: #self.logger.debug("Record read: " + unicode(record, encoding='utf-8')) diff --git a/scripts/freq/freqProcessor_nifi1.8.0.py b/scripts/freq/freqProcessor_nifi1.8.0.py new file mode 100644 index 0000000000000000000000000000000000000000..e40d5c08f29a982f7d51601c8d5f06e3d73ab4cd --- /dev/null +++ b/scripts/freq/freqProcessor_nifi1.8.0.py @@ -0,0 +1,196 @@ +#! /usr/bin/python +# This is a version for NiFi v.1.8.0 + +import sys +import traceback +from org.apache.nifi.processor import AbstractProcessor +from org.apache.nifi.processor import Relationship +from org.apache.nifi.components import PropertyDescriptor +from org.apache.nifi.expression import ExpressionLanguageScope +from org.apache.nifi.processor.util import StandardValidators +from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory, SimpleRecordSchema +from org.apache.nifi.serialization.record import RecordField, RecordFieldType + +import os + +from freq import * + +class FreqProcessor(AbstractProcessor): + # Relationship descriptors + _rel_success = Relationship.Builder().description("Success").name("success").build() + _rel_failure = Relationship.Builder().description("Failure").name("failure").build() + + # Property descriptors + # Record reader and writer services to use + _record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build() + _record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build() + # Record field to get the string to be analyzed from + _input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build() + # Record fields to store the results (prob1, prob2) into + _result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build() + _result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build() + # File with character frequency table (as created by freq.py) + _freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build() + + def __init__(self): + self.fc = FreqCounter() + self.fc_loaded = False + + def initialize(self, context): + self.logger = context.getLogger() + + def setLogger(self, logger): + self.logger = logger + + def getRelationships(self): + return set([self._rel_success, self._rel_failure]) + + def validate(self, context): + pass + + def getPropertyDescriptors(self): + return [self._record_reader, self._record_writer, + self._input_field, self._result_field1, self._result_field2, + self._freq_file] + + def onPropertyModified(self, descriptor, newValue, oldValue): + pass + + def onTrigger(self, context, sessionFactory): + session = sessionFactory.createSession() + if not self.fc_loaded: + # Load character frequency table from given file + # (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method) + self.fc.load(str(context.getProperty("Frequency File").evaluateAttributeExpressions().getValue())) + self.fc_loaded=True + self.logger.info("Sucessfully loaded frequency file") + + # Get field names to work with (set in configuration) + # (remove leading '/' if present to simulate trivial support of RecordPath) + self.input_field_name = context.getProperty("Input Field").evaluateAttributeExpressions().getValue().lstrip('/') + self.result1_field_name = context.getProperty("Result Field 1").evaluateAttributeExpressions().getValue().lstrip('/') + self.result2_field_name = context.getProperty("Result Field 2").evaluateAttributeExpressions().getValue().lstrip('/') + + try: + flowfile = session.get() + if flowfile is None : + return + + self.logger.debug("Processing FlowFile {}".format(flowfile.getAttribute('uuid'))) + + readerFactory = context.getProperty(self._record_reader).asControllerService(RecordReaderFactory) + writerFactory = context.getProperty(self._record_writer).asControllerService(RecordSetWriterFactory) + originalAttributes = flowfile.attributes + attributes = {} + recordCount = 0 + + input_stream = session.read(flowfile) + reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger) + + # Create a new FlowFile to write the result to + # (create() method copies the attributes from the original one, but not the content) + # TODO: This results in FORK and DROP events in Data Provenance log. + # I would like to get CONTENT MODIFIED log (as UpdateRecord + # processor does) but can't find a way to do it. + # I can't open InputStream and OutputStream at the same time + # on single flowfile - that's why I need to make a new one. + # It probably should be done by InputOutputStream, but it + # seems there's no way in Python to get it (in Java it uses + # callbacks and method overloading, but overloading doesn't + # work in Python, only one of the Java methods is available + # in Python). + newflowfile = session.create(flowfile) + output_stream = session.write(newflowfile) + + try: + # Get the first record and process it before we create the Record Writer. + # We do this so that if the Processor updates the Record's schema, we can provide + # an updated schema to the Record Writer. If there are no records, then we can + # simply create the Writer with the Reader's schema and begin & end the RecordSet + record = reader.nextRecord() + #self.logger.debug("Record read (first in batch): " + record.encode('utf-8')) + if not record: + # There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet + self.logger.debug("FlowFile with no record") + writeSchema = writerFactory.getSchema(originalAttributes, reader.schema) + writer = writerFactory.createWriter(self.logger, writeSchema, output_stream) + try: + writer.beginRecordSet() + writeResult = writer.finishRecordSet() + attributes['record.count'] = str(writeResult.recordCount) + attributes.update(writeResult.attributes) + recordCount = writeResult.recordCount + finally: + writer.close() + else: + # process first record + self.process_record(record) + + # There are some records to process ... + # Add new fields to the schema + oldSchema = writerFactory.getSchema(originalAttributes, record.schema) + fields = list(oldSchema.getFields()) + fields.append(RecordField(self.result1_field_name, RecordFieldType.FLOAT.getDataType())) + fields.append(RecordField(self.result2_field_name, RecordFieldType.FLOAT.getDataType())) + newSchema = SimpleRecordSchema(fields) + writer = writerFactory.createWriter(self.logger, newSchema, output_stream) + try: + writer.beginRecordSet() + writer.write(record) + record = reader.nextRecord() + while record: + #self.logger.debug("Record read: " + unicode(record, encoding='utf-8')) + + # process record + self.process_record(record) + + writer.write(record) + + record = reader.nextRecord() + + writeResult = writer.finishRecordSet() + attributes['record.count'] = str(writeResult.recordCount) + attributes.update(writeResult.attributes) + recordCount = writeResult.recordCount + finally: + writer.close() + finally: + reader.close() + input_stream.close() + output_stream.close() + except Exception as e: + # Failure - print error info and route to 'failure' + exc_type, exc_obj, tb = sys.exc_info() + self.logger.error("Failed to process FlowFile {}; will route to failure. Exception in '{}' at line {}: {}".format(flowfile.getAttribute('uuid'), tb.tb_frame.f_code.co_filename, tb.tb_lineno, str(e))) + session.remove(newflowfile) + session.transfer(flowfile, self._rel_failure) + session.commit() + return + + # Success + newflowfile = session.putAllAttributes(newflowfile, attributes) + session.remove(flowfile) + session.transfer(newflowfile, self._rel_success) + session.adjustCounter('Records Processed', recordCount, False) + session.commit() + self.logger.debug('Successfully processed {} records in FlowFile {}'.format(recordCount, flowfile.getAttribute('uuid'))) + + + def process_record(self, record): + """ + Processing of individual record - domain probability computation + + Get domain form record, compute probability using the FreqCounter, + and store the resulting probabilities to the record. + """ + text = record.getValue(self.input_field_name) + if text is None: + raise ValueError("Can't get value of '{}' field".format(self.input_field_name)) + + prob1, prob2 = self.fc.probability(text) + + record.setValue(self.result1_field_name, prob1) + record.setValue(self.result2_field_name, prob2) + + +processor = FreqProcessor()