_posts/2017-06-06-real-time-sql-on-event.html (299 lines of code) (raw):
---
layout: post
status: PUBLISHED
published: true
title: Real-Time SQL on Event Streams
id: dfac7edd-8713-4ebe-bdf8-d7bf25683a30
date: '2017-06-06 14:52:53 -0400'
categories: nifi
tags:
- record
- nifi
- sql
- event
permalink: nifi/entry/real-time-sql-on-event
---
<h1>Real-Time SQL On Event Streams</h1>
<p>
<span class="author">Mark Payne - </span><br />
<span class="author"><a href="https://twitter.com/dataflowmark">@dataflowmark</a></span></p>
<hr />
<p>
Apache NiFi has grown tremendously over the past 2 and a half years since it was<br />
open sourced. The community is continuously thinking of, implementing, and contributing<br />
amazing new features. The newly released version 1.2.0 of NiFi is no exception. One of the most<br />
exciting features of this new release is the QueryRecord Processor and the Record Reader and<br />
Record Writer components that go along with it. If you aren't familiar with those components,<br />
there's a <a href="https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi">blog post</a><br />
that explains how those work.</p>
<p>
This new Processor, powered by <a href="https://calcite.apache.org/">Apache Calcite</a>,<br />
allows users to write SQL SELECT statements to run over their data<br />
as it streams through the system. Each FlowFile in NiFi can be treated as if it were<br />
a database table named FLOWFILE. These SQL queries can be used to filter specific columns<br />
or fields from your data, rename those columns/fields, filter rows, perform calculations<br />
and aggregations on the data, route the data, or whatever else you may want to use SQL<br />
for. All from the comfy confines of the most widely known and used Domain Specific Language.</p>
<p>
This is a big deal! Of course, there are already other platforms that allow you to run SQL over arbitrary<br />
data, though. So let's touch on how NiFi differs from other platforms that run SQL over arbitrary data<br />
outside of an RDBMS:</p>
<ul>
<li style="margin-top: 1em;">
<b>Queries are run locally.</b> There is no need to push the data to some external service,<br />
such as S3, in order to run queries over your data. There's also no need to pay for<br />
that cloud storage or the bandwidth, or to create temporary "staging tables" in a database.
</li>
<li style="margin-top: 1em;">
<b>Queries are run inline.</b> Since your data is already streaming through NiFi, it is very<br />
convenient to add a new QueryRecord Processor to your canvas. You <i>are</i> already<br />
streaming your data through NiFi, aren't you? If not, you may find<br />
<a href="http://nifi.apache.org/docs.html">Our Docs Page</a> helpful to get started learning<br />
more about NiFi.
</li>
<li style="margin-top: 1em;">
<b>Query data in any format. Write results in any data format.</b> One of the goals of this effort was<br />
to allow data to be queried regardless of the format. In order to accomplish this, the Processor<br />
was designed to be configured with a "Record Reader" Controller Service and a "Record Writer"<br />
Controller Service. Out of the box, there are readers for CSV, JSON, Avro, and even log data. The results<br />
of the query can be written out in CSV, JSON, Avro, or free-form text (for example, a log format)<br />
using the <a href="http://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html">NiFi Expression Language</a>.<br />
If your data is in another format, you are free to write your own implementation of the Record Reader<br />
and/or Record Writer Controller Service. A simple implementation can wrap an existing library to return<br />
Record objects from an InputStream. That's all that is needed to run SQL over your own custom data<br />
format! Writing the results in your own custom data format is similarly easy.
</li>
<li style="margin-top: 1em;">
<b>It's fast - very fast!</b> Just how fast largely depends on the performance of your disks and the number of disks that you have<br />
available. The data must be read from disk, queried, and then the results written to disk. In most scenarios, reading of the data<br />
from disk is actually avoided due to operating system disk caching, though. We've tested the performance on a<br />
server with 32 cores and 64 GB RAM. We ran a continuous stream of JSON-formatted Provenance data through the Processor.<br />
To do this, we used the<br />
<a href="http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-site-to-site-reporting-nar/1.2.0/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/index.html">NiFi Provenance Reporting Task</a><br />
to send the Provenance data back to the NiFi instance. Because we wanted to stress the Processor, we then<br />
used a DuplicateFlowFile processor to create 200 copies of the data (this really just creates 200 pointers to the same piece of data;<br />
it doesn't copy the data itself). We used a SQL query to pull out any Provenance "SEND" event (a small percentage<br />
of the data, in this case). Using 12 concurrent tasks, we were able to see the query running at a consistent rate of about 1.2 GB per second<br />
on a single node - using less than half of the available CPU!
</li>
<li style="margin-top: 1em;">
<b>Data Provenance keeps a detailed trail of what happened.</b> One of the biggest differentiators between<br />
NiFi and other dataflow platforms is the detailed Data Provenance that NiFi records. If the data that<br />
you end up with is not what you expect, the Data Provenance feature makes it easy to see exactly what<br />
the data looked like at each point in the flow and pinpoint exactly what when wrong - as well as understand<br />
where the data come from and where the data went. When the flow has<br />
been updated, the data can be replayed with the click of a button and the new results can be verified.<br />
If the results are still not right, update your flow and replay again.
</li>
</ul>
<h2 style="margin-top: 75px;">How It Works</h2>
<p>
In order to get started, we need to add a QueryRecord Processor to the graph. Once we've added the Processor<br />
to the graph, we need to configure three things: the Controller Service to use for reading data, the service<br />
to use for writing the results, and one or more SQL queries. The first time that you set this all up, it may<br />
seem like there's a lot going on. But once you've done it a couple of times, it becomes pretty trivial. The<br />
rest of this post will be dedicated to setting everything up. First, we will configure the Record Reader Controller<br />
Service. We'll choose to create a new service:</p>
<p><img class="dialog" style="width: 50%; height: 50%" src="https://blogs.apache.org/nifi/mediaresource/6fe0e0bf-58b6-4557-a677-109b2a1ffd55" /></p>
<p>
We are given a handful of different types of services to choose from. For this example, we will use CSV data, so<br />
we will use a CSVReader:</p>
<p><img class="dialog" style="width: 50%; height: 50%" src="https://blogs.apache.org/nifi/mediaresource/160ebe02-72e2-4b12-ba3a-a759648b75b8" /></p>
<p>
We will come back and configure our new CSVReader service in a moment. For now, we will click the "Create" button and then choose to create a new<br />
Controller Service to write the results. We will write the results in JSON format:</p>
<p><img class="dialog" style="width: 50%; height: 50%" src="https://blogs.apache.org/nifi/mediaresource/6d8b5221-b67a-46e0-bbae-8bf65aff6a16" /></p>
<p>
We can again click the "Create" button to create the service. Now that we have created our Controller Services, we will click the "Go To" button on our reader service to jump<br />
to the Controller Service configuration page:</p>
<p><a href="https://blogs.apache.org/nifi/mediaresource/f7396001-ee40-4a0d-a7b7-39aaa463c2cd"><br />
<img class="screenshot" src="https://blogs.apache.org/nifi/mediaresource/f7396001-ee40-4a0d-a7b7-39aaa463c2cd" /><br />
</a></p>
<h2 style="margin-top: 75px;">Configuring the Reader</h2>
<p>
We can click the pencil in the right-hand column to configure our CSV Reader:</p>
<p><img class="dialog" style="width: 50%; height: 50%" src="https://blogs.apache.org/nifi/mediaresource/3f118244-6b1b-4e68-b96a-07228aa87ec6" /></p>
<p>
The CSV Reader gives us plenty of options to customize the reader to our format, as can be seen in the above image. For this example, we will<br />
leave most of the defaults, but we will change the "Skip Header Line" Property from the default value of "false" to "true" because our data will contain a header<br />
line that we don't want to process as an actual record.</p>
<p>
In order to run SQL over our data and make sense of the columns in our data, we need<br />
to also configure the reader with a schema for the data. We have several options for doing<br />
this. We can use an attribute on the FlowFile that includes an Avro-formatted schema. Or we<br />
can use a Schema Registry to store our schema and access it by name or by identifier and version.<br />
Let's go ahead and use a local Schema Registry and add our schema to that Registry. To do so,<br />
we will change the "Schema Access Strategy" to "Use 'Schema Name' Property." This means that<br />
we want to look up the Schema to use by name. The name of the schema is specified by the "Schema Name"<br />
Property. The default value for that property is "${schema.name}", which means that we will just use<br />
the "schema.name" attribute to identify which schema we want to use. Instead of using an attribute,<br />
we could just type the name of the schema here. Doing so would mean that we would need a different<br />
CSV Reader for each schema that we want to read, though. By using an attribute, we can have a single<br />
CSV Reader that works for all schemas.</p>
<p>
Next, we need to specify the Schema<br />
Registry to use. We click the value of the "Schema Registry" property and choose to "Create new service..."<br />
We will use the AvroSchemaRegistry. (Note that our incoming data is in CSV format and the output will<br />
be in JSON. That's okay, Avro in this sense only refers to the format of the schema provided. So we will<br />
provide a schema in the same way that we would if we were using Avro.) We will click the "Create" button<br />
and then click the "Go To" arrow that appears in the right-hand column in order to jump to the<br />
Schema Registry service (and click 'Yes' to save changes to our CSV Reader service).</p>
<p>
This will take us back to our Controller Services configuration screen. It is important<br />
to note that from this screen, each Controller Service has a "Usage" icon on the left-hand<br />
side (it looks like a little book). That icon will take you to the documentation on how to<br />
usage that specific Controller Service. The documentation is fairly extensive. Under the<br />
"Description" heading, each of the Record Readers and Writers has an "Additional Details..."<br />
link that provides much more detailed information about how to use service and provides<br />
examples.</p>
<p>
We will click the Edit ("pencil") icon next to the newly created AvroSchemaRegistry and go to<br />
the Properties tab. Notice that the service has no properties, so we click the New Property ("+") icon<br />
in the top-right corner. The name of the property is the name that we will use to refer to the schema.<br />
Let's call it "hello-world". For the value, we can just type or paste in the schema that we want to use<br />
using the <a href="https://avro.apache.org/docs/1.8.1/spec.html#schemas">Avro Schema syntax</a>.<br />
For this example, we will use the following schema:</p>
<p><code></p>
<pre>
{
"name": "helloWorld",
"namespace": "org.apache.nifi.blogs",
"type": "record",
"fields": [
{ "name": "purchase_no", "type": "long" },
{ "name": "customer_id", "type": "long" },
{ "name": "item_id", "type": ["null", "long"] },
{ "name": "item_name", "type": ["null", "string"] },
{ "name": "price", "type": ["null", "double"] },
{ "name": "quantity", "type": ["null", "int"] },
{ "name": "total_price", "type": ["null", "double"] }
]
}
</pre>
<p></code></p>
<p>
Now we can click "OK" and apply our changes. Clicking Enable (the lightning bolt icon) enables the service.<br />
We can now also enable our CSV Reader.</p>
<h2 style="margin-top: 75px;">Configuring the Writer</h2>
<p>
Similarly, we need to configure our writer with a schema so that NiFi knows how<br />
we expect our data to look. If we click the Pencil icon next to our JSONRecordSetWriter,<br />
in the Properties tab, we can configure whether we want our JSON data to be pretty-printed<br />
or not and how we want to write out date and time fields. We also need to specify how to access<br />
the schema and how to convey the schema to down-stream processing. For the "Schema Write Strategy,"<br />
since we are using JSON, we will just set the "schema.name" attribute, so we will leave the default value.<br />
If we were writing in Avro, for example, we would probably want to include the schema in the data itself.<br />
For the "Schema Access Strategy," we will use the "Schema Name" property, and set the "Schema Name" property<br />
to "${schema.name}" just as we did with the CSV Reader. We then select the same AvroSchemaRegistry service for<br />
the "Schema Registry" property.</p>
<p><img class="dialog" style="width: 50%; height: 50%" src="https://blogs.apache.org/nifi/mediaresource/9dbf2d38-50bc-4b23-b1af-d9d111cbcd32" /></p>
<p>
Again, we click "Apply" and then click the Lightning icon to enable our Controller Service and click the Enable button. We can<br />
then click the "X" to close out this dialog.</p>
<h2 style="margin-top: 75px;">Write the SQL</h2>
<p>
Now comes the fun part! We can go back to configure our QueryRecord Processor.<br />
In the Properties tab, we can start writing our queries. For this example, let's<br />
take the following CSV data:</p>
<pre>
<code>
purchase_no, customer_id, item_id, item_name, price, quantity
10280, 40070, 1028, Box of pencils, 6.99, 2
10280, 40070, 4402, Stapler, 12.99, 1
12440, 28302, 1029, Box of ink pens, 8.99, 1
28340, 41028, 1028, Box of pencils, 6.99, 18
28340, 41028, 1029, Box of ink pens, 8.99, 18
28340, 41028, 2038, Printer paper, 14.99, 10
28340, 41028, 4018, Clear tape, 2.99, 10
28340, 41028, 3329, Tape dispenser, 14.99, 10
28340, 41028, 5192, Envelopes, 4.99, 45
28340, 41028, 3203, Laptop Computer, 978.88, 2
28340, 41028, 2937, 24\" Monitor, 329.98, 2
49102, 47208, 3204, Powerful Laptop Computer, 1680.99, 1
</code>
</pre>
<p>
In our Properties tab, we can click the "Add Property" button to add a new property.<br />
Because we can add multiple SQL queries in a single Processor, we need a way to distinguish the results of each<br />
query and route the data appropriately. As such, the name of the property is the name of the Relationship that<br />
data matching the query should be routed to. We will create two queries. The first will be named "over.1000" and<br />
will include the purchase_no and customer_id fields of any purchase that cost more than $1,000.00 and will also<br />
include a new field named total_price that is the dollar amount for the entire purchase. Note that when entering<br />
a value for a property in NiFi, you can use Shift + Enter to insert a newline in your value:</p>
<p><code></p>
<pre>
SELECT purchase_no, customer_id, SUM(price * quantity) AS total_price
FROM FLOWFILE
GROUP BY purchase_no, customer_id
HAVING SUM(price * quantity) > 1000
</pre>
<p></code></p>
<p>
The second property will be named "largest.order" and will contain the purchase_no, customer_id, and total price<br />
of the most expensive single purchase (as defined by price times quantity) in the data:</p>
<p><code></p>
<pre>
SELECT purchase_no, customer_id, SUM(price * quantity) AS total_price
FROM FLOWFILE
GROUP BY purchase_no, customer_id
ORDER BY total_price DESC
LIMIT 1
</pre>
<p></code></p>
<p>
Now we will wire our QueryRecord processor up in our graph so that we can use it.<br />
For this demo, we will simply use a GenerateFlowFile to feed it data. We will set the<br />
"Custom Text" property to the CSV that we have shown above. In the "Scheduling" tab,<br />
I'll configure the processor to run once every 10 seconds so that I don't flood the system with<br />
data. We need to add a "schema.name"<br />
attribute, so we will route the "success" relationship of GenerateFlowFile to an UpdateAttribute<br />
processor. To this processor, we will add a new Property named "schema.name" with a value of<br />
"hello-world" (to match the name of the schema that we added to the AvroSchemaRegistry service).<br />
We will route the "success" relationship to QueryRecord.</p>
<p>
Next, we will create two UpdateAttribute Processors and connect the "over.1000" relationship<br />
to the first and the "largest.order" relationship to the other. This just gives us a simple<br />
place to hold the data so that we can view it. I will loop the "failure" relationship back<br />
to the QueryRecord processor so that if there is a problem evaluating the SQL, the data<br />
will remain in my flow. I'll also auto-terminate the "original" relationship because once<br />
I have evaluated the SQL, I don't care about the original data anymore. I'll also auto-terminate<br />
the "success" relationship of each terminal UpdateAttribute processor. When we start the<br />
QueryRecord, GenerateFlowFile, and the first UpdateAttribute processors, we end up with<br />
a FlowFile queued up before each UpdateAttribute processor:</p>
<p><a href="https://blogs.apache.org/nifi/mediaresource/a68753b4-40ab-433f-bd78-e552f3b5e7f4"><br />
<img class="screenshot" src="https://blogs.apache.org/nifi/mediaresource/a68753b4-40ab-433f-bd78-e552f3b5e7f4" /><br />
</a></p>
<p>
If we right-click on the "over.1000" connection and click "List queue," we are able to see<br />
the FlowFiles in the queue:</p>
<p><a href="https://blogs.apache.org/nifi/mediaresource/fc1af388-e1c5-4d13-9cb1-f052f21f2aa9"><br />
<img class="screenshot" src="https://blogs.apache.org/nifi/mediaresource/fc1af388-e1c5-4d13-9cb1-f052f21f2aa9" /><br />
</a></p>
<p>
Clicking the "information" icon on the left-hand-side of the FlowFile gives us the ability<br />
to view the FlowFile's attributes and download or view its contents. We can click the "View"<br />
button to view the content. Changing the "View as" drop-down at the top from "original" to<br />
"formatted" gives us a pretty-printed version of the JSON, and we quickly see the results that we want:</p>
<p><a href="https://blogs.apache.org/nifi/mediaresource/9fc4b6f3-ac32-4491-9e4e-096235ad141a"><br />
<img class="screenshot" src="https://blogs.apache.org/nifi/mediaresource/9fc4b6f3-ac32-4491-9e4e-096235ad141a" /><br />
</a></p>
<p>
Note that we have a null value for the columns that are defined in our schema that were not part of our results.<br />
If we wanted to, We could certainly update our schema in order to avoid having these fields show up at all.</p>
<p>
Viewing the FlowFiles of the "largest.order" connection shows us a single FlowFile also, with<br />
the content that we expect there as well:</p>
<p><a href="https://blogs.apache.org/nifi/mediaresource/a3398aa6-3433-469c-bd97-532516851b3e"><br />
<img class="screenshot" src="https://blogs.apache.org/nifi/mediaresource/a3398aa6-3433-469c-bd97-532516851b3e" /><br />
</a></p>
<p>
Of course, if we have already run the data through the flow and want to go back and inspect<br />
that data and what happened to it, we can easily do that through NiFi's powerful<br />
<a href="http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance">Data Provenance</a><br />
feature.</p>
<h2 style="margin-top: 75px;">Conclusion</h2>
<p>
Here, I have given but a small glimpse into the capabilities of the new QueryRecord Processor for NiFi.<br />
The possibilities that it opens up are huge. I'd love to hear thoughts and ideas on how to make it even<br />
better or questions that you may have! Feel free to reach out to the NiFi mailing list at users@nifi.apache.org<br />
or dev@nifi.apache.org.</p>