Thursday, February 1, 2018

InvokeScriptedProcessor template revisited (with Javascript)

I recently got a request on Google+ for a Javascript/Nashorn example of InvokeScriptedProcessor, and since I've done articles on an InvokeScriptedProcessor template (to port from ExecuteScript) for Groovy and Jython, I thought I'd do the same for Javascript, since the template also illustrates the methods you'd have to override in order to write your own InvokeScriptedProcessor that does more powerful/flexible things.

Javascript/Nashorn is a slightly peculiar animal in terms of its ScriptEngine. In Groovy and Jython you need to create a subclass of Processor, implement the requisite methods, and assign an instance of the subclass to the variable "processor". The same is true of Javascript, except in order to invoke methods on the "processor" object, it has to be an instance of ScriptObject (an internal Nashorn class) or ScriptObjectMirror (a class of the Nashorn scripting API). In our context, the script body/file of InvokeScriptedProcessor is the thing that gets evaluated and cast as a ScriptObjectMirror, which means we need a slightly different approach than just creating a subclass and setting "processor" to an instance of it.  Instead the script itself has to be able to be cast as a Processor, so it can be a ScriptObjectMirror and Processor at the same time.

To that end, we declare the Processor interface methods (and its inherited interface methods) as functions on the main script, and then we set the "processor" variable to "this". Another difference from the other scripting language examples is that you need variable access to the various Java classes (both in Java proper and the NiFi API) before you can instantiate them. So in the "imports" section you will see a number of Java.type() calls, to get JS references to the Java classes.  You will have to do the same if you reference other Java classes in your executeScript() body.

Speaking of which, I tried to keep the same approach to giving an obvious place to paste your ExecuteScript code into the InvokeScriptedProcessor template, to make porting from ExecuteScript to InvokeScriptedProcessor easier. The template follows:

// "imports" go here

var Processor = Java.type("org.apache.nifi.processor.Processor");
var Relationship =  Java.type("org.apache.nifi.processor.Relationship");
var HashSet =  Java.type("java.util.HashSet");
var log = null;
var REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build();
var REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build();

function executeScript(session, context, log, REL_SUCCESS, REL_FAILURE) {
   // your code goes here

function initialize(context) { log = context.logger; }
function getRelationships() {
    var r = new HashSet(); 
    return r; 
function validate(context) { return null; }
function getPropertyDescriptor(name) { return null; }
function onPropertyModified(descriptor, oldValue, newValue) { return null; }
function getPropertyDescriptors() { return null; }
function getIdentifier() { return null; }
function onTrigger(context, sessionFactory) {
        var session = sessionFactory.createSession();
        try {
            executeScript(session, context, log, REL_SUCCESS, REL_FAILURE);
        } catch (t) {
            log.error("{} failed to process due to {}; rolling back session",[this, t], "java.lang.Object[]"));
            throw t;

processor = this;

As always, please let me know how/if this works for you, and of course comments, questions, and suggestions are welcome.  Cheers!

Tuesday, November 7, 2017

InvokeScriptedProcessor template revisited (with Jython)

In a previous post, I provided a template in Groovy that would allow NiFi users to port their ExecuteScript Groovy scripts into the faster InvokeScriptedProcessor (ISP) processor. ISP is faster than ExecuteScript because the script is only reloaded when the code or other config changes, versus ExecuteScript which evaluates the script each time the processor is invoked.

Since that post, I've gotten a couple of requests (such as this one) for an ISP template written in Jython, so users that have ExecuteScript processors using Jython scripts can benefit from the ISP performance gains. Ask and ye shall receive :) The following Jython script is meant to be pasted into an InvokeScriptedProcessor's Script Body property, and there is a comment indicating where to add imports and the ExecuteScript code:

#// imports go here
from org.apache.nifi.processor import Processor,Relationship
from java.lang import Throwable

class E():
    def __init__(self):
    def executeScript(self,session, context, log, REL_SUCCESS, REL_FAILURE):
        #// Replace 'pass' with your code
#end class

class JythonProcessor(Processor):   
    REL_SUCCESS = Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    REL_FAILURE = Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    log = None
    e = E()
    def initialize(self,context):
        self.log = context.logger
    def getRelationships(self):
        return set([self.REL_SUCCESS, self.REL_FAILURE])
    def validate(self,context):
    def onPropertyModified(self,descriptor, oldValue, newValue):
    def getPropertyDescriptors(self):
        return []
    def getIdentifier(self):
        return None    
    def onTrigger(self,context, sessionFactory):
        session = sessionFactory.createSession()
            self.e.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE)
        except Throwable, t:
            self.log.error('{} failed to process due to {}; rolling back session', [self, t])
            raise t
#end class

processor = JythonProcessor()

Like the Groovy version, you just need to add your imports to the top of the file, and paste your ExecuteScript Jython code into the executeScript() method, replacing the "pass" line. As always, please let me know how/if this works for you, and if you have any comments, questions, or suggestions.  The script is available as a Gist also.  Cheers!

Wednesday, October 25, 2017

Testing NiFi Expression Language with Groovy

(This post is adapted from a Hortonworks Community Connection article I wrote)
Some NiFi Expression Language (EL) expressions can be fairly complex, or used in a large flow, or both. These can make it difficult to test an EL expression on a running NiFi system. Although an excellent feature of NiFi is being able to adapt the flow while the system is running, it may not be prudent to stop a downstream processor, reroute a connection to something like UpdateAttribute, then list the queue in order to see attributes, content, etc.
To make EL testing easier, I wrote a Groovy script called testEL.groovy that uses the same EL library that NiFi does, so all functions present in the specified NiFi version are available to the test tool. The following is the usage:
  1. usage: groovy testEL.groovy [options] [expressions]
  2. Options:
  3. -D <attribute=value> set value for given attribute
  4. -help print this message
As an example, the following tests an expression that appends "_world" to the "filename" attribute:
  1. > groovy testEL.groovy -D filename=hello '${filename:append("_world")}'
  2. hello_world
Note that it accepts multiple attribute definitions and multiple expressions, so you can test more than one expression using a single set of attributes:
  1. > groovy testEL.groovy -D filename=hello -D size=10 '${filename:append("_world")}' '${filename:prepend("I say "):append(" ${size} times")}'
  2. hello_world
  3. I say hello 10 times
The script is as follows:
@Grab(group='org.apache.nifi', module='nifi-expression-language', version='1.4.0')
import org.apache.nifi.attribute.expression.language.*

def cli = new CliBuilder(usage:'groovy testEL.groovy [options] [expressions]',
                          header:'Options:')'print this message')
cli.D(args:2, valueSeparator:'=', argName:'attribute=value',
       'set value for given attribute')
def options = cli.parse(args)
if(!options.arguments()) {
  return 1

def attrMap = [:]
def currKey = null
options.Ds?.eachWithIndex {o,i ->
  if(i%2==0) {
    currKey = o
  } else {
    attrMap[currKey] = o
options.arguments()?.each {
  def q = Query.compile(it)
  println q.evaluate(attrMap ?: null)

and is also available as a Gist.
Hopefully you find this script helpful, if you try it please let me know how/if it works for you, and as always I welcome any questions, comments and suggestions on how to make things better :)

Thursday, October 5, 2017

Release 1.2.0 of nifi-script-tester

I've just released version 1.2.0 of the nifi-script-tester, a utility that lets you test your Groovy, Jython, and Javascript scripts for use in the NiFi ExecuteScript processor.  Here are the new features:

- Upgraded code to NiFi 1.4.0
- Added support for incoming flow file attributes

For the first point, there was a lot of refactor done in the NiFi Scripting NAR in order to reuse code across various scripting components in NiFi, such as processors, controller services, record readers/writers, and reporting tasks.  Getting the codebase up to date will allow me to add new features such as the ability to test RecordReader/Writer scripts, ScriptedReportingTask scripts, etc.

For the second point, I'd been asked to add that support for a while so I finally got around to it :) There is now a new switch "attrfile" that lets you point to a Java properties file. These properties will be added to each flow file (whether coming from STDIN or the inputdir switch). In the future I hope to add support for Expression Language, and perhaps figure out a way to specify a set of attributes per incoming flow file (rather than reusing one set for all, whether that set supports EL or not). The code is Apache-licensed and on GitHub, I welcome any pull requests :)

Here is the new usage info:

Usage: java -jar nifi-script-tester-<version>-all.jar [options] <script file>
 Where options may include:
   -success            Output information about flow files that were transferred to the success relationship. Defaults to true
   -failure            Output information about flow files that were transferred to the failure relationship. Defaults to false
   -no-success         Do not output information about flow files that were transferred to the success relationship. Defaults to false
   -content            Output flow file contents. Defaults to false
   -attrs              Output flow file attributes. Defaults to false
   -all-rels           Output information about flow files that were transferred to any relationship. Defaults to false
   -all                Output content, attributes, etc. about flow files that were transferred to any relationship. Defaults to false
   -input=<directory>  Send each file in the specified directory as a flow file to the script
   -modules=<paths>    Comma-separated list of paths (files or directories) containing script modules/JARs
   -attrfile=<paths>   Path to a properties file specifying attributes to add to incoming flow files.

If anyone gives this a try, please let me know how/if it works for you. As always, I welcome all questions, comments, and suggestions.  Cheers!

Monday, June 5, 2017

InvokeScriptedProcessor template (a faster ExecuteScript)

For quick, easy, and small scripting tasks in Apache NiFi, ExecuteScript is often a better choice than InvokeScriptedProcessor, as there is little-to-no boilerplate code, relationships and properties are already defined and supported, and some objects relevant to the NiFi API (such as the ProcessSession, ProcessContext, and ComponentLog) are already bound to the script engine as variables that can readily be used by the script.

However, one tradeoff is performance; in ExecuteScript, the script is evaluated each time onTrigger is executed. With InvokeScriptedProcessor, as long as the script (or any of the InvokeScriptedProcessor properties) is not changed, the scripted Processor instance is maintained by the processor, and its methods are simply invoked when parent methods such as onTrigger() are called by the NiFi framework.

To get the best of both worlds, I have put together an InvokeScriptedProcessor instance that is configured the same way ExecuteScript is. The "success" and "failure" relationships are provided, the API objects are available, and if you simply paste your ExecuteScript code into the same spot in the below script, it will behave like a more performant ExecuteScript instance.  The code is as follows:

// imports go here

class E{ void executeScript(session, context, log, REL_SUCCESS, REL_FAILURE) 
        // your code goes here

class GroovyProcessor implements Processor {
    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    def ComponentLog log
    def e = new E()   
    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { null }
    String getIdentifier() { null }    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        def session = sessionFactory.createSession()
        try {
            e.executeScript(session, context, log, REL_SUCCESS, REL_FAILURE)
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            throw t
processor = new GroovyProcessor()

The boilerplate Processor implementation is at the bottom, and I've left comment blocks where your imports and code go. With some simple cut-and-paste, you should be able to have a pre-evaluated Processor instance that will run your ExecuteScript code faster than before!

If you give this a try, please let me know how/if it works for you. I am always open to suggestions, improvements, comments, and questions.  Cheers!

Tuesday, March 14, 2017

NiFi ExecuteScript Cookbook

Hello All!  Just wanted to write a quick post here to let you know about a series of articles I have written about ExecuteScript support for Apache NiFi, with discussions of how to do various tasks, and examples in various supported scripting languages. I posted them on Hortonworks Community Connection (HCC).  Full disclosure: I am a Hortonworks employee :)

Lua and ExecuteScript in NiFi (revisited)

I recently fielded a question about using Lua (actually, LuaJ) in NiFi's ExecuteScript processor to manipulate flow files. I had written a basic article on using LuaJ with ExecuteScript, but that example only shows how to create new flow files, it does not address accepting incoming flow files or manipulating the data.

To rectify that I answered the question with the following example script:

flowFile = session:get()
if flowFile == nil then

local writecb =
luajava.createProxy("", {
    process = function(inputStream, outputStream)
      local isr = luajava.newInstance('', inputStream)
      local br = luajava.newInstance('', isr)
      local line = br:readLine()
      while line ~= nil do
         -- Do stuff to each line here
         line = br:readLine()
         if line ~= nil then
flowFile = session:putAttribute(flowFile, "lua.attrib", "my attribute value")
flowFile = session:write(flowFile, writecb)
session:transfer(flowFile, REL_SUCCESS)

Readers of my last LuaJ post will recognize the approach, using luajava.createProxy() to basically create an anonymous class instance of a NiFi Callback class, then providing (aka "overriding") the requisite interface method (in this case, the "process" method).

The first difference here is that I'm using the StreamCallback class instead of the OutputStreamCallback class in my previous example. You may recall that OutputStreamCallback only allows you to write to a flow file, whereas StreamCallback is for overwriting existing flow file content, by making both the input stream of the current version of the flow file, as well as the output stream for the next version of the flow file, available in the process() method.

You may also recall that often my scripting examples use Apache Commons' IOUtils class to read the
entire flow file content in as a string, then manipulate it after the fact. However LuaJ has a bug where it only uses the system classloader and thus won't have access to the additional classes provided to the scripting NAR.  So for this example I am wrapping the incoming InputStream in an InputStreamReader, then a BufferedReader so I can proceed line-by-line.  I reverse each line, and if there are lines remaining, I add the newline back to the output stream.

If you are simply reading in the content of a flow file, and won't be overwriting that flow file content, you can use InputStreamCallback instead of StreamCallback. InputStreamCallback's process() method gives you only the input stream of the incoming flow file. Some people use a with an InputStreamCallback to handle the incoming flow file(s), then later use a session.write() with an OutputStreamCallback, rather than a single session.write() with a StreamCallback as is shown above. The common use case for the alternate approach is for when you send the original flow file to an "original" relationship, but also write out new flow files based on the content of the incoming one(s). Many "Split" processors do this.

Anyway, I hope this example is informative and shows how to use Lua scripting in NiFi to perform custom logic.  As always, I welcome all comments, questions, and suggestions.  Cheers!