Fetching weather observations

Whatever your business is, weather conditions affect its performance. Correlation between those two, needs to be tracked and analyzed within your warehouse ecosystem.

Apache Nifi and its variety of connectors seems to be a perfect tool to ingest weather observations from public datasets. In this post we show how to connect to The Deutscher Wetterdienst (http://dwd.de) – Germany’s National Meteorological Service.

Deutscher Wetterdienst runs Open Data Server and https://opendata.dwd.de/ allows us to fetch weather observations from German monitoring stations.

Fetch from FTP Server

Data is available on an open FTP server and each file contains recent data of some monitoring station:

Screen Shot 2018-05-08 at 17.40.25.png

We can start with ListFTP processor to list files contained within a directory:

Screen Shot 2018-05-08 at 17.47.43.png

We provide a hostname, a port, a remote path to fetch. We can filter listed files so that we only receive ZIP files with weather observations.

Screen Shot 2018-05-08 at 17.48.58.png

We also provide username and password. This is actually something, that did not work for us within standard Nifi Processors. We use ‘anonymous’ username and ‘anonymous@domain.com’ as password. When connection is established, username and password are sent simultanously. After sending username, server responds with 331 code (“User name okay, need password”) which is considered mistakenly by standard processor as an error (code different than 2xx).

Fortunately, we can prepare custom in-house processor that fixes this issue!

Screen Shot 2018-05-08 at 18.00.10.png

For each listed file, we fetch it with FetchFTP processor:

Extract ZIP Content

Next step is to extract ZIP content of the file with UnpackContent Processor:

Screen Shot 2018-05-08 at 18.01.48.png

Each zip file contains data and metadata files and “File Filter” Regex can be applied to filter.

Screen Shot 2018-05-12 at 10.13.10.png

Record Schema and CSV Parsing

The fetched CSV files are of the form:

Screen Shot 2018-05-08 at 18.22.39.png
We start with ReplaceText Processor to replace end of record “eor\n” into newline.

Then we create ConvertRecord Processor:

Screen Shot 2018-05-08 at 18.24.50.png

That requires two configuration fields: CSVReader and JSONRecordSetWriter which are the services:

Screen Shot 2018-05-08 at 18.25.54.png

To configure CSVReader we configure CSV reading details and also need to create AvroSchemaRegistry service. CSVReader contains “Schema name” field. We can name our schema “observations” and define the schema within AvroSchemaRegistry in the next step.

Screen Shot 2018-05-08 at 18.34.46.png

Below You can see AVRO definition of the schema:

Screen Shot 2018-05-08 at 18.26.59.png

We can cast all fields to “string”. At the end, we put our data into BigQuery and JSON fields will be automatically casted based on the fields in BigQuery table definition, so we do not need to take care of it now.

Split Json and Put to Big Query

Apache Nifi allows us to view the messages queued within a relation. At the time now, we have JSON array with each CSV line as a separate JSON object.

Screen Shot 2018-05-08 at 18.36.34.png

We can apply SplitJSON and PutBigQuery processors as we have done in the post “FACEBOOK TO BIGQUERY PIPELINE WITH NIFI”.

Finally, we have the nasty FTP, zip encrypted CSV files data in a clear relational format accessible via SQL in a modern big data warehouse.

We can run a simple query to check row with a minimum value of tmk column (temperature mean).

Screen Shot 2018-05-08 at 18.51.38.png

Summary

It does not matter if we want to move data into BigQuery, or any other modern Big Data warehouse. The key point is to fetch it from various sources. Apache NIFI allows to do so. This happened to us with Deutscher Wetterdienst in this post. With a couple of clicks and drag-n-drops we were able to fetch files from FTP, extract them from ZIP, parse its content and split records to fit relational model within Big Query.

How much time would it take a software developer to write such pipeline on your own?

 

Leave a Reply