Monday, March 14, 2016

ExecuteScript - JSON-to-JSON Revisited (with Jython)

I've received some good comments about a couple of previous blog posts on using the ExecuteScript processor in NiFi (0.5.0+) to perform JSON-to-JSON transformations. One post used Groovy and the other used Javascript.

Since then I've received some requests for a Jython example for ExecuteScript, so I figured I'd do the same use case again (JSON to JSON) so folks can see the differences in the languages when performing the same operations :)

The approach has been covered in detail in the other posts, so I will talk a bit about the Jython-specific stuff and then get right to the code.

One major caveat here is that I don't know Python :)  I learned enough to get the script working, but please let me know how to better do any of this. I stopped touching the script once it worked, so it's very possible there are unnecessary imports, classes, etc.

The first thing to do is to bring in the Jython libraries you will need, as well as importing the Java and NiFi classes to be used:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

I didn't need to import java.lang.String as Jython does type coercion. I probably couldn't call getBytes() on that string unless Jython knew to coerce the object to a Java String, but that's ok because we can call bytearray("myString".encode('utf-8')) to achieve the same results.

The next task was to create a StreamCallback object for use in session.write(). I created a Jython class for this and overrode the interface method:
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
        # ...

After that, I read in and parsed the JSON text with IOUtils then json.loads(), then performed all the operations on the various parts of the object/dictionary. Finally I generated a new JSON string with json.dumps(), encoded to UTF-8, got the byte array, and wrote it to the output stream.

The resulting script is as follows:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Range": 5,
          "Rating": obj['rating']['primary']['value'],
          "SecondaryRatings": {}
        }
    for key, value in obj['rating'].iteritems():
      if key != "primary":
        newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}
              
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

The template is available as a Gist (here). I welcome all suggestions on how to make this better, and please share any scripts you come up with!

Cheers!

Wednesday, March 2, 2016

ExecuteScript - JSON-to-JSON Revisited (with Javascript)

A question came in on the Apache NiFi users group about doing things with ExecuteScript using something other than Groovy (specifically, Javascript). I put a fairly trivial example inside a previous post, but it doesn't cover one of the most important features, overwriting flow file content.

One major difference between Groovy and Javascript here is that you will want to get a reference to Java objects using Java.type(), in order to create new objects, invoke static methods, etc. I use the following in the script below:

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

Also, the Javascript engine has something very much like closure coercion in Groovy. If you are creating a new anonymous object from a class/interface, and that class/interface has a single method, you can provide a Javascript function (with the appropriate arguments) in the object constructor. So to create an implementation of StreamCallback (used in session.write() to overwrite the contents of a flowfile), you have:

new StreamCallback(function(inputStream, outputStream) { /* Do stuff */ })

With that in mind, I recreated the JSON-to-JSON example from my previous post, this time using Javascript as the language.  The script is as follows:

var flowFile = session.get();
if (flowFile != null) {

  var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback")
  var IOUtils = Java.type("org.apache.commons.io.IOUtils")
  var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")

  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        var obj = JSON.parse(text)
        var newObj = {
          "Range": 5,
          "Rating": obj.rating.primary.value,
          "SecondaryRatings": {}
        }
        for(var key in obj.rating){
          var attrName = key;
          var attrValue = obj.rating[key];
          if(attrName != "primary") {
            newObj.SecondaryRatings[attrName] = {"id": attrName, "Range": 5, "Value": attrValue.value};
          }
        }
        outputStream.write(JSON.stringify(newObj, null, '\t').getBytes(StandardCharsets.UTF_8)) 
    }))
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)
}

The template is available a Gist (here), please let me know how/if this works for you.  Cheers!

UPDATE: This script works with Nashorn (Java 8's Javascript engine), but doesn't work if you're using Java 7 (Rhino).  Here's an equivalent script for Java 7 / Rhino:

importClass(org.apache.commons.io.IOUtils)
importClass(java.nio.charset.StandardCharsets)
importClass(org.apache.nifi.processor.io.StreamCallback)
var flowFile = session.get();
if (flowFile != null) {

flowFile = session.write(flowFile,
        new StreamCallback() {process : function(inputStream, outputStream) {
            var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
     var obj = JSON.parse(text)
            var newObj = {
  "Range": 5,
  "Rating": obj.rating.primary.value,
  "SecondaryRatings": {}
     }
     for(var key in obj.rating){
             var attrName = key;
             var attrValue = obj.rating[key];
    if(attrName != "primary") {
     newObj.SecondaryRatings[attrName] = {"id": attrName, "Range": 5, "Value": attrValue.value};
  }
            }
    outputStream.write(new java.lang.String(JSON.stringify(newObj, null, '\t')).getBytes(StandardCharsets.UTF_8)) 
        }})
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
}