Skip to content
Snippets Groups Projects
Commit 10dcf66d authored by Václav Bartoš's avatar Václav Bartoš
Browse files

Updated for NiFi 1.11.3

parent 6e00797b
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/python #! /usr/bin/python
# This is a version for NiFi v.1.11.3
import sys import sys
import traceback import traceback
...@@ -55,6 +56,9 @@ class FreqProcessor(AbstractProcessor): ...@@ -55,6 +56,9 @@ class FreqProcessor(AbstractProcessor):
def onPropertyModified(self, descriptor, newValue, oldValue): def onPropertyModified(self, descriptor, newValue, oldValue):
pass pass
def onStopped(self, context):
pass
def onTrigger(self, context, sessionFactory): def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession() session = sessionFactory.createSession()
if not self.fc_loaded: if not self.fc_loaded:
...@@ -62,7 +66,7 @@ class FreqProcessor(AbstractProcessor): ...@@ -62,7 +66,7 @@ class FreqProcessor(AbstractProcessor):
# (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method) # (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method)
self.fc.load(str(context.getProperty("Frequency File").evaluateAttributeExpressions().getValue())) self.fc.load(str(context.getProperty("Frequency File").evaluateAttributeExpressions().getValue()))
self.fc_loaded=True self.fc_loaded=True
self.logger.info("Sucessfully loaded frequency file") self.logger.debug("Sucessfully loaded frequency file")
# Get field names to work with (set in configuration) # Get field names to work with (set in configuration)
# (remove leading '/' if present to simulate trivial support of RecordPath) # (remove leading '/' if present to simulate trivial support of RecordPath)
...@@ -84,7 +88,8 @@ class FreqProcessor(AbstractProcessor): ...@@ -84,7 +88,8 @@ class FreqProcessor(AbstractProcessor):
recordCount = 0 recordCount = 0
input_stream = session.read(flowfile) input_stream = session.read(flowfile)
reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger) # ref: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
reader = readerFactory.createRecordReader(originalAttributes, input_stream, -1, self.logger)
# Create a new FlowFile to write the result to # Create a new FlowFile to write the result to
# (create() method copies the attributes from the original one, but not the content) # (create() method copies the attributes from the original one, but not the content)
...@@ -98,6 +103,9 @@ class FreqProcessor(AbstractProcessor): ...@@ -98,6 +103,9 @@ class FreqProcessor(AbstractProcessor):
# callbacks and method overloading, but overloading doesn't # callbacks and method overloading, but overloading doesn't
# work in Python, only one of the Java methods is available # work in Python, only one of the Java methods is available
# in Python). # in Python).
# Later note: That's not true, overloading should be resolved
# automaticatlly by the number of parameters and their types.
# -> try it again
newflowfile = session.create(flowfile) newflowfile = session.create(flowfile)
output_stream = session.write(newflowfile) output_stream = session.write(newflowfile)
...@@ -107,12 +115,13 @@ class FreqProcessor(AbstractProcessor): ...@@ -107,12 +115,13 @@ class FreqProcessor(AbstractProcessor):
# an updated schema to the Record Writer. If there are no records, then we can # an updated schema to the Record Writer. If there are no records, then we can
# simply create the Writer with the Reader's schema and begin & end the RecordSet # simply create the Writer with the Reader's schema and begin & end the RecordSet
record = reader.nextRecord() record = reader.nextRecord()
#self.logger.debug("Record read (first in batch): " + record.encode('utf-8')) #self.logger.debug("Record read (first in batch): " + unicode(record, encoding='utf-8'))
if not record: if not record:
# There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet # There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet
self.logger.debug("FlowFile with no record") self.logger.debug("FlowFile with no record")
writeSchema = writerFactory.getSchema(originalAttributes, reader.schema) writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
writer = writerFactory.createWriter(self.logger, writeSchema, output_stream) # ref: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
writer = writerFactory.createWriter(self.logger, writeSchema, output_stream, None)
try: try:
writer.beginRecordSet() writer.beginRecordSet()
writeResult = writer.finishRecordSet() writeResult = writer.finishRecordSet()
...@@ -127,15 +136,25 @@ class FreqProcessor(AbstractProcessor): ...@@ -127,15 +136,25 @@ class FreqProcessor(AbstractProcessor):
# There are some records to process ... # There are some records to process ...
# Add new fields to the schema # Add new fields to the schema
#self.logger.debug("origAttributes: " + str(originalAttributes))
oldSchema = writerFactory.getSchema(originalAttributes, record.schema) oldSchema = writerFactory.getSchema(originalAttributes, record.schema)
fields = list(oldSchema.getFields()) fields = oldSchema.getFields()
fields.append(RecordField(self.result1_field_name, RecordFieldType.FLOAT.getDataType())) field_names = [f.getFieldName() for f in fields]
fields.append(RecordField(self.result2_field_name, RecordFieldType.FLOAT.getDataType())) # ref: https://github.com/apache/nifi/blob/master/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
newSchema = SimpleRecordSchema(fields) if self.result1_field_name not in field_names:
writer = writerFactory.createWriter(self.logger, newSchema, output_stream) fields.append(RecordField(self.result1_field_name, RecordFieldType.FLOAT.getDataType(), False))
if self.result2_field_name not in field_names:
fields.append(RecordField(self.result2_field_name, RecordFieldType.FLOAT.getDataType(), False))
newSchema = SimpleRecordSchema(fields, oldSchema.getIdentifier())
# Create writer
# ref: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
writer = writerFactory.createWriter(self.logger, newSchema, output_stream, None)
try: try:
# write record; process and write all further records in a loop...
writer.beginRecordSet() writer.beginRecordSet()
writer.write(record) writer.write(record)
record = reader.nextRecord() record = reader.nextRecord()
while record: while record:
#self.logger.debug("Record read: " + unicode(record, encoding='utf-8')) #self.logger.debug("Record read: " + unicode(record, encoding='utf-8'))
......
#! /usr/bin/python
# This is a version for NiFi v.1.8.0
import sys
import traceback
from org.apache.nifi.processor import AbstractProcessor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.expression import ExpressionLanguageScope
from org.apache.nifi.processor.util import StandardValidators
from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory, SimpleRecordSchema
from org.apache.nifi.serialization.record import RecordField, RecordFieldType
import os
from freq import *
class FreqProcessor(AbstractProcessor):
# Relationship descriptors
_rel_success = Relationship.Builder().description("Success").name("success").build()
_rel_failure = Relationship.Builder().description("Failure").name("failure").build()
# Property descriptors
# Record reader and writer services to use
_record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build()
_record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build()
# Record field to get the string to be analyzed from
_input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
# Record fields to store the results (prob1, prob2) into
_result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
_result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
# File with character frequency table (as created by freq.py)
_freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
def __init__(self):
self.fc = FreqCounter()
self.fc_loaded = False
def initialize(self, context):
self.logger = context.getLogger()
def setLogger(self, logger):
self.logger = logger
def getRelationships(self):
return set([self._rel_success, self._rel_failure])
def validate(self, context):
pass
def getPropertyDescriptors(self):
return [self._record_reader, self._record_writer,
self._input_field, self._result_field1, self._result_field2,
self._freq_file]
def onPropertyModified(self, descriptor, newValue, oldValue):
pass
def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession()
if not self.fc_loaded:
# Load character frequency table from given file
# (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method)
self.fc.load(str(context.getProperty("Frequency File").evaluateAttributeExpressions().getValue()))
self.fc_loaded=True
self.logger.info("Sucessfully loaded frequency file")
# Get field names to work with (set in configuration)
# (remove leading '/' if present to simulate trivial support of RecordPath)
self.input_field_name = context.getProperty("Input Field").evaluateAttributeExpressions().getValue().lstrip('/')
self.result1_field_name = context.getProperty("Result Field 1").evaluateAttributeExpressions().getValue().lstrip('/')
self.result2_field_name = context.getProperty("Result Field 2").evaluateAttributeExpressions().getValue().lstrip('/')
try:
flowfile = session.get()
if flowfile is None :
return
self.logger.debug("Processing FlowFile {}".format(flowfile.getAttribute('uuid')))
readerFactory = context.getProperty(self._record_reader).asControllerService(RecordReaderFactory)
writerFactory = context.getProperty(self._record_writer).asControllerService(RecordSetWriterFactory)
originalAttributes = flowfile.attributes
attributes = {}
recordCount = 0
input_stream = session.read(flowfile)
reader = readerFactory.createRecordReader(originalAttributes, input_stream, self.logger)
# Create a new FlowFile to write the result to
# (create() method copies the attributes from the original one, but not the content)
# TODO: This results in FORK and DROP events in Data Provenance log.
# I would like to get CONTENT MODIFIED log (as UpdateRecord
# processor does) but can't find a way to do it.
# I can't open InputStream and OutputStream at the same time
# on single flowfile - that's why I need to make a new one.
# It probably should be done by InputOutputStream, but it
# seems there's no way in Python to get it (in Java it uses
# callbacks and method overloading, but overloading doesn't
# work in Python, only one of the Java methods is available
# in Python).
newflowfile = session.create(flowfile)
output_stream = session.write(newflowfile)
try:
# Get the first record and process it before we create the Record Writer.
# We do this so that if the Processor updates the Record's schema, we can provide
# an updated schema to the Record Writer. If there are no records, then we can
# simply create the Writer with the Reader's schema and begin & end the RecordSet
record = reader.nextRecord()
#self.logger.debug("Record read (first in batch): " + record.encode('utf-8'))
if not record:
# There is no record, just create the Writer with the Reader's schema and begin & end the RecordSet
self.logger.debug("FlowFile with no record")
writeSchema = writerFactory.getSchema(originalAttributes, reader.schema)
writer = writerFactory.createWriter(self.logger, writeSchema, output_stream)
try:
writer.beginRecordSet()
writeResult = writer.finishRecordSet()
attributes['record.count'] = str(writeResult.recordCount)
attributes.update(writeResult.attributes)
recordCount = writeResult.recordCount
finally:
writer.close()
else:
# process first record
self.process_record(record)
# There are some records to process ...
# Add new fields to the schema
oldSchema = writerFactory.getSchema(originalAttributes, record.schema)
fields = list(oldSchema.getFields())
fields.append(RecordField(self.result1_field_name, RecordFieldType.FLOAT.getDataType()))
fields.append(RecordField(self.result2_field_name, RecordFieldType.FLOAT.getDataType()))
newSchema = SimpleRecordSchema(fields)
writer = writerFactory.createWriter(self.logger, newSchema, output_stream)
try:
writer.beginRecordSet()
writer.write(record)
record = reader.nextRecord()
while record:
#self.logger.debug("Record read: " + unicode(record, encoding='utf-8'))
# process record
self.process_record(record)
writer.write(record)
record = reader.nextRecord()
writeResult = writer.finishRecordSet()
attributes['record.count'] = str(writeResult.recordCount)
attributes.update(writeResult.attributes)
recordCount = writeResult.recordCount
finally:
writer.close()
finally:
reader.close()
input_stream.close()
output_stream.close()
except Exception as e:
# Failure - print error info and route to 'failure'
exc_type, exc_obj, tb = sys.exc_info()
self.logger.error("Failed to process FlowFile {}; will route to failure. Exception in '{}' at line {}: {}".format(flowfile.getAttribute('uuid'), tb.tb_frame.f_code.co_filename, tb.tb_lineno, str(e)))
session.remove(newflowfile)
session.transfer(flowfile, self._rel_failure)
session.commit()
return
# Success
newflowfile = session.putAllAttributes(newflowfile, attributes)
session.remove(flowfile)
session.transfer(newflowfile, self._rel_success)
session.adjustCounter('Records Processed', recordCount, False)
session.commit()
self.logger.debug('Successfully processed {} records in FlowFile {}'.format(recordCount, flowfile.getAttribute('uuid')))
def process_record(self, record):
"""
Processing of individual record - domain probability computation
Get domain form record, compute probability using the FreqCounter,
and store the resulting probabilities to the record.
"""
text = record.getValue(self.input_field_name)
if text is None:
raise ValueError("Can't get value of '{}' field".format(self.input_field_name))
prob1, prob2 = self.fc.probability(text)
record.setValue(self.result1_field_name, prob1)
record.setValue(self.result2_field_name, prob2)
processor = FreqProcessor()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment