Whatever your business is, weather conditions affect its performance. Correlation between those two, needs to be tracked and analyzed within your warehouse ecosystem.
Apache Nifi and its variety of connectors seems to be a perfect tool to ingest weather observations from public datasets. In this post we show how to connect to The Deutscher Wetterdienst (http://dwd.de) – Germany’s National Meteorological Service.
Fetch from FTP Server
Data is available on an open FTP server and each file contains recent data of some monitoring station:
We can start with ListFTP processor to list files contained within a directory:
We provide a hostname, a port, a remote path to fetch. We can filter listed files so that we only receive ZIP files with weather observations.
We also provide username and password. This is actually something, that did not work for us within standard Nifi Processors. We use ‘anonymous’ username and ‘email@example.com’ as password. When connection is established, username and password are sent simultanously. After sending username, server responds with 331 code (“User name okay, need password”) which is considered mistakenly by standard processor as an error (code different than 2xx).
Fortunately, we can prepare custom in-house processor that fixes this issue!
For each listed file, we fetch it with FetchFTP processor:
Extract ZIP Content
Next step is to extract ZIP content of the file with UnpackContent Processor:
Each zip file contains data and metadata files and “File Filter” Regex can be applied to filter.
Record Schema and CSV Parsing
The fetched CSV files are of the form:
We start with ReplaceText Processor to replace end of record “eor\n” into newline.
Then we create ConvertRecord Processor:
That requires two configuration fields: CSVReader and JSONRecordSetWriter which are the services:
To configure CSVReader we configure CSV reading details and also need to create AvroSchemaRegistry service. CSVReader contains “Schema name” field. We can name our schema “observations” and define the schema within AvroSchemaRegistry in the next step.
Below You can see AVRO definition of the schema:
We can cast all fields to “string”. At the end, we put our data into BigQuery and JSON fields will be automatically casted based on the fields in BigQuery table definition, so we do not need to take care of it now.
Split Json and Put to Big Query
Apache Nifi allows us to view the messages queued within a relation. At the time now, we have JSON array with each CSV line as a separate JSON object.
We can apply SplitJSON and PutBigQuery processors as we have done in the post “FACEBOOK TO BIGQUERY PIPELINE WITH NIFI”.
Finally, we have the nasty FTP, zip encrypted CSV files data in a clear relational format accessible via SQL in a modern big data warehouse.
We can run a simple query to check row with a minimum value of tmk column (temperature mean).
It does not matter if we want to move data into BigQuery, or any other modern Big Data warehouse. The key point is to fetch it from various sources. Apache NIFI allows to do so. This happened to us with Deutscher Wetterdienst in this post. With a couple of clicks and drag-n-drops we were able to fetch files from FTP, extract them from ZIP, parse its content and split records to fit relational model within Big Query.
How much time would it take a software developer to write such pipeline on your own?