Use Machine Learning and GridDB to build a Production-Ready Stock Market Anomaly Detector

Israel Imru
5 min readAug 24, 2021

--

In this tutorial, we will import comma separated (CSV) data into GridDB using the popular ETL tool, Apache Nifi. Nifi is a enterprise-ready data plumbing platform that is highly configurable and extensible.

ETL is an acronym that stands for Extract, Transform, and Load, which simply means copying a data source from one format to another where the data is contextualized differently between the source and destination. While there are many advanced enterprise ETL tools, many developers have used basic text processing tools like awk, sed, and grep to build rudimentary ETL pipelines.

In this tutorial, we will cover three different transformations that most GridDB users would like to implement:

  • Partition data into multiple tables based on a record column value.
  • Convert.
  • Combine multiple columns into one.
  • Convert a datetime string to timestamp value that can be inserted into GridDB.

Setup Nifi/GridDB

Nifi’s installation is simple; download and untar the tarball before starting the server:

If you want to access the Nifi server from a remote computer, edit conf/nifi.properties, changing nifi.web.http.host to your local IP address.

If you do not already have GridDB set up, follow the Getting Started manual to first install GridDB. You will also need to use the gridstore-jdbc.jar from GridDB.net’s Github repository. It has implemented additional JDBC functions requiured by Nifi.

The Data Set

We’re going to use the Historic NYPD Complaint Data made available Open NYC available here. The data has nearly 7 million rows across 35 columns totalling just over 2GB. We’ve also made available a 1000 row version for testing that you can download from the GridDB.net GitHub Repository here.

Build the Nifi Flow

The Nifi Flow defines how data is extracted, transformed, and loaded with a series of Processors and Controllers. A Processor is responsible for the task whose inputs and outputs are defined by its configured Controllers. If you’re new to Nifi, the navigation can be difficult. Processors can be added by dragging the first icon on the top bar into the workspace and Controllers can be added to your Flow by Clicking the “Gear Icon” in the “Operate Panel” and then selecting the “Controller Services” tab.

The following screenshot depicts the overall flow and how the various processors are connected:

The LogAttributes are generic and show the number of failed or successful records output by a processor.

The AvroRegistry Controller defines schemas and is referenced by the other controllers, in this case the schema of the CSV file we’re reading and the schema of the GridDB table/container that we’re writing.

Most of the NYCC data is missing one or more data fields which can cause errors, so defaults are set in the input schema as follows:

The defaults aren’t required in the output schema as the fields have been populated, but two fields are added for the CMPLNT_TO/FR timestamp:

The DBCPConnectionPool controller is responsible for all database connections within our flow and it uses the GridDB JDBC interface. It is configured with the JDBC driver, JAR path, connection url, username, and password as follows:

GetFile

The GetFile processor reads the CSV file from disk. All that needs to be configured is the path where the CSV will be read. It should be noted that the CSV will be deleted after being read, so it is best to create a new staging directory and copy in files as required.

PartitionRecord

The first PartitionRecord processor splits the records from the flow based on a column value. In the case of the New York Crime Complaint data, we’re going to split the data by precinct which is the ADDR_PCT_CD field. It works by setting the ${precinct} attribute to the value of the /ADDR_PCT_CD field. PutDatabaseRecord and PutSQL processors will use the ${precinct} attribute to determine the table name. This split allows us to put data for individual police precincts into separate tables/containers.

The RawCSVReader controller is used to read the Raw CSV from GetFile while the RawCSVRecordSetWriter re-writes the CSV for the next Processor in the chain.

PutSQL

We need to create the table for a precinct before PutDatabaseRecord can write; PutSQL is the best processor for that. It reads the ${precinct} attribute and executes the following SQL statement:

PartitionRecord

The second PartitionRecord processor splits up the input so that each row of the CSV becomes one record. Records are still read and written by RawCSVReader and RawCSVRecordSetWriter respectively.

The first UpdateRecord processor changes the date format of the CSV from MM/dd/yyyy to yyyy-MM-dd as that is the expected format to convert into a timestamp in the next processor.

The property updates used are:

The UpdateRecord processor combines Date and Time fields and then converts the record to Avro by using the AvroRecordSetWriter as the output. By converting the record from CSV to Avro we are able to convert a date string such as 2021–02–11 18:14:17 to an epoch 1613067247 that can be written to the timestamp field by the PutDatabaseRecord processor.

The property updates used are:

The AvroRecordSetWriter controller is configured as below:

Finally the last process, PutDatabaseRecord, writes the incoming records to GridDB using the DBCPConnectionPool controller. The records are read with the AvroReader controller which is configured as shown:

Finally, to start to the ETL process, we copy the downloaded rows.csv to the input directory we configured in the GetFile processor and can monitor the load from the Nifi web interface.

This tutorial has demonstrated how to load a large dataset stored in CSV to GridDB. With the extensibility of Nifi, it is possible to Extract, Transform, and Load nearly any data set from any destination to any source without writing complex scripts. You can download the Nifi Flow template used in this tutorial here.

Originally published at http://github.com.

--

--