Skip to content
Snippets Groups Projects
Commit 6e00797b authored by Václav Bartoš's avatar Václav Bartoš
Browse files

freq: added support for attribute expressions, made leading '/' in field names optional

parent a2b20835
Branches
No related tags found
No related merge requests found
...@@ -5,6 +5,7 @@ import traceback ...@@ -5,6 +5,7 @@ import traceback
from org.apache.nifi.processor import AbstractProcessor from org.apache.nifi.processor import AbstractProcessor
from org.apache.nifi.processor import Relationship from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.expression import ExpressionLanguageScope
from org.apache.nifi.processor.util import StandardValidators from org.apache.nifi.processor.util import StandardValidators
from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory, SimpleRecordSchema from org.apache.nifi.serialization import RecordReaderFactory, RecordSetWriterFactory, SimpleRecordSchema
from org.apache.nifi.serialization.record import RecordField, RecordFieldType from org.apache.nifi.serialization.record import RecordField, RecordFieldType
...@@ -23,12 +24,12 @@ class FreqProcessor(AbstractProcessor): ...@@ -23,12 +24,12 @@ class FreqProcessor(AbstractProcessor):
_record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build() _record_reader = PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory).required(True).build()
_record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build() _record_writer = PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory).required(True).build()
# Record field to get the string to be analyzed from # Record field to get the string to be analyzed from
_input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build() _input_field = PropertyDescriptor.Builder().name("Input Field").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
# Record fields to store the results (prob1, prob2) into # Record fields to store the results (prob1, prob2) into
_result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build() _result_field1 = PropertyDescriptor.Builder().name("Result Field 1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
_result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(True).build() _result_field2 = PropertyDescriptor.Builder().name("Result Field 2").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
# File with character frequency table (as created by freq.py) # File with character frequency table (as created by freq.py)
_freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(True).build() _freq_file = PropertyDescriptor.Builder().name("Frequency File").addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(True).build()
def __init__(self): def __init__(self):
self.fc = FreqCounter() self.fc = FreqCounter()
...@@ -59,14 +60,15 @@ class FreqProcessor(AbstractProcessor): ...@@ -59,14 +60,15 @@ class FreqProcessor(AbstractProcessor):
if not self.fc_loaded: if not self.fc_loaded:
# Load character frequency table from given file # Load character frequency table from given file
# (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method) # (Note: this cannot be done in initialize() since the context there doesn't contain the getProperty() method)
self.fc.load(str(context.getProperty("Frequency File").getValue())) self.fc.load(str(context.getProperty("Frequency File").evaluateAttributeExpressions().getValue()))
self.fc_loaded=True self.fc_loaded=True
self.logger.info("Sucessfully loaded frequency file") self.logger.info("Sucessfully loaded frequency file")
# Get field names to work with (set in configuration) # Get field names to work with (set in configuration)
self.input_field_name = context.getProperty("Input Field").getValue()[1:] # (remove leading '/' if present to simulate trivial support of RecordPath)
self.result1_field_name = context.getProperty("Result Field 1").getValue()[1:] self.input_field_name = context.getProperty("Input Field").evaluateAttributeExpressions().getValue().lstrip('/')
self.result2_field_name = context.getProperty("Result Field 2").getValue()[1:] self.result1_field_name = context.getProperty("Result Field 1").evaluateAttributeExpressions().getValue().lstrip('/')
self.result2_field_name = context.getProperty("Result Field 2").evaluateAttributeExpressions().getValue().lstrip('/')
try: try:
flowfile = session.get() flowfile = session.get()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment