Database Pipeline

Contents

About

The database pipeline allows us to copy data from one "database" to another. It primarily exists to extract data from Telkom's EDW environment (Oracle). Tools are available to refresh the tables in the source database before pulling data, as well as retention features for BigQuery and GCS. The pipeline currently supports the following: * Sources * Oracle via OJDBC * MySQL via OJDBC * Destinations * BigQuery via uploading a Parquet to GCS

Architecture

alt text

The pipeline uses Apache Sqoop to connect to the source and download the data, Spark to process the data on the local machine, and GCS to upload the resulting Parquet file where once loaded it is automatically ingested into BigQuery according to the specified configurations.

alt text

Detailed description of database_to_gcp.py

When running database_to_gcp.py it does the following: 1. The program first checks to see if it should refresh data in the source database. It looks for a <table_name>.sql preSQL script in the <project_name>/presql/ folder. If the script exists, the program executes the queries in the script, otherwise the program moves on. A presql_lim_hrs flag can be added to config.json to indicate the time in hours DBP should wait before running the preSQL script again. This is useful for those big preSQL scrippts you dont want to acciently run if the pipeline breaks downstream. To set the preSQL time limit, includ the following under source in config.json: json "presql_lim_hrs": 0 When a preSQL script is available, preSQL will always execute if presql_lim_hrs is not specified in the config file or set to 0.

  1. The program checks the schema of the source table against a previously saved schema to see if they match.
    • If there is no previous schema, the schema checker will allow the program to continue.
    • If a change_approved flag has been added to the source key in the json and is set to 1, it will allow the program to continue regardless of changes to the schema. json "change_approved": 0
    • If the above conditions are not met and the schema does not match the previous schema, the program will stop.
  2. If the schema is approved, the program uses Sqoop to download the data from Oracle to a local Parquet file.
  3. Once the download is complete, the program uses a Spark JDBC connection to pull the correct data types from the Oracle table and then converts the Parquet data types to those types instead.
  4. The program uploads the parquet files and trigger file to Google Cloud Storage.
  5. Lastly the program checks if a postSQL script <table_name>.sql for the source database exists in the <project_name>/postsql/ folder. If the script exists and if preSQL was executed, the program executes the queries in the postSQL script, otherwise the program moves on.

When the parquet file is loaded into GCS: 1. GCS's notification system sends a message to the PubSub topic containing all the details of where and what file has been uploaded. 2. PubSub has been configured to run a cloud function when the topic is triggered and passes on the details to the function. 3. The function then takes those details and uses it to configure the BigQuery API that tells BigQuery to use the parquet to create a table.

Motivation

For each step/each tool used, with alternatives considered and problems we ran into that resulted in our final choice

Why check for a schema change?

So that we are made aware of any changes to the table. This also insures that the dashboards and models don't break when a column is renamed, added, removed or changed to a different data type.

Why use Sqoop to download the data and not the Spark JDBC connector?

Spark JDBC is very slow and takes a lot of configuring to compare with the download speed of Sqoop (Which is made for downloading data from relational databases). Further more Sqoop can also be optimized specifically for the table to increase download speeds.

Why are you using the Spark JDBC connector to get the data types?

Sqoop changes the data types to improve the download speed. Which means they do not match the source tables original data types. This ensures we will keep the same data types as the Source tables.

Why do you need a trigger file?

When downloading via Sqoop, multiple Parquet files are generated in a folder. Cloud Storage notifications trigger once per file upload and uploading multiple files would cause multiple triggers. To get around this, we instead configure the notification to wait for a trigger file. This trigger file also allows us to pass additional information to the Cloud Function that gets triggered.

First Time Setup

Note, when using your user account for GCP uploads (If you don't have access to the service account file you use your user account):

The pydata-google-auth function will fail inside the program. You need to run the function once outside of the program for it to work inside the program.

Install pydata_google_auth:

sudo pip3 install pydata_google_auth

Type python3 in your terminal and hit enter, then copy past the following:

import pydata_google_auth
pydata_google_auth.get_user_credentials(['https://www.googleapis.com/auth/cloud-platform'])

The function requires you to log into your Google account.

General (All sources and destinations)

  1. Install the GCloud SDK and set it up with your user account
  2. Create the PubSub topic: shell gcloud pubsub topics create trigger-database-pipeline
    • Where trigger-database-pipeline is the name of the topic we want to create
  3. Deploy the Cloud Function
    • Change directory to the cloud_function folder shell cd ./gcpsetup/cloud_function/
  4. Deploy the function shell gcloud functions deploy ingest_database_pipeline --region europe-west2 --runtime python37 --trigger-topic trigger-database-pipeline * ingest_database_pipeline is the name of the function to call when the Cloud Function is triggered * europe-west2 is the region we want to deploy to (anything in EU is fine) * python37 indicates which environment the function requires * trigger-database-pipeline is the PubSub topic the function will subscribe to (Writing a Cloud Function triggered by PubSub)
  5. The ingest_database_pipeline_workflow.yaml file in the repo:
    • Is used to setup a Data Proc cluster. In our case we setup the cluster to run a job and then close again. In the config file you need to specify the database type and the connection details to connect to the database.
  6. The trigger-database-pipeline.py file in the repo:
    • Is the code used to upload the data to any database that has JDBC connector. This file needs to be in cloud storage in the location specified in the .yaml file above (or you need to change the location).

Config file examples

BigQuery as destination, Oracle as source
{
    "source": {
        "jdbc_uri": "jdbc:oracle:thin:@//<db_url>:1525/<oracle_db_name>",
        "user": "<source_db_username>",
        "password": "<source_db_password>",
        "change_approved": 0,
        "presql_lim_hrs": 0,
        "local_retention_days": 7,
        "remote_retention_days": 30,
        "remote_retention_tables": 10
    },
    "dest": [
        {
            "db_type": "bigquery"
        }
    ]
}
BigQuery as destination, MySQL as source
{
    "source": {
        "jdbc_uri": "jdbc:mysql://<db_url>/<mysql_db_name>",
        "user": "<source_db_username>",
        "password": "<source_db_password>",
        "change_approved": 0,
        "presql_lim_hrs": 0,
        "local_retention_days": 7,
        "remote_retention_days": 30,
        "remote_retention_tables": 10
    },
    "dest": [
        {
            "db_type": "bigquery"
        }
    ]
}
Postgres as destination, Oracle as source
{
    "source": {
        "jdbc_uri": "jdbc:oracle:thin:@//<db_url>:1525/<oracle_db_name>",
        "user": "<source_db_username>",
        "password": "<source_db_password>",
        "change_approved": 0,
        "presql_lim_hrs": 0,
        "local_retention_days": 7
    },
    "dest": [
        {
            "db_type": "postgres",
            "ip": "",
            "user": "",
            "password": ""
        }
    ]
}

Setting up a new Pipeline

Prerequisites

  1. A PubSub topic to trigger on file upload
  2. The ingest_database_pipeline function deployed to Cloud Functions
  3. Spark, Sqoop, GCloud SDK installed in WSL. Use the Engineering Env script from the "Dev Environments" repo to get the environment setup
  4. A chosen that will be used to house the config and name the destination bucket/dataset/database. The pipeline will use the folder you create in ./projects/ to infer the project name.
  5. Chosen /s that will be the descriptive part of the name/s in destination database. The pipeline will use the config/s file you create in ./projects/<project_name>/config/ to infer the the table name/s.
  6. Source database requirements:
    • All databases:
      • Username
      • Password
    • Oracle:
    • MySQL:
      • Table names & SQL statements

Setup Process

BigQuery as the Destination

If GCS is the final destination, only do step 1 and then move on to Local Configurations 1. Create a GCS bucket named in Europe West 3 (Frankfurt) 2. Create a BigQuery dataset named in the EU region 3. Create a Cloud Storage notification that will trigger the PubSub topic once the config file finishes uploading.

gsutil notification create -t trigger-database-pipeline -f json -e OBJECT_FINALIZE -p auto_data/trigger/ gs://<project_name>

The options are configured as follows: - -t The PubSub topic to notify, in our case always trigger-database-pipeline - -f The format that we want the notification to be in for PubSub, always json - -e The event type filter, by default it send notifications on any object action, we only want to notify when a file is added or overwritten, therefore always OBJECT_FINALIZE. - -p The folder to watch, notifications will only be sent for objects that have this prefix, always auto_data/trigger/ - gs://<bucket_id> The bucket you setup for this pipeline

Local Configurations

  1. Create the following directories
    • ./projects/<project_name>
    • ./projects/<project_name>/config
    • ./projects/<project_name>/sql
    • ./projects/<project_name>/presql (optional)
    • ./projects/<project_name>/postsql (optional)
  2. Create one config file per table in ./projects/<project_name>/config/ as <table_name>.json. This file contains the various configurations for required to inform the pipeline of the source and destination (see examples).
  3. Create one SQL file per table in ./projects/<project_name>/sql/ as <table_name>.sql. The SQL file contains the statement that will be run by the pipeline to extract the data from the source database. This should ideally be a SELECT * for simplicity's sake, to avoid a "missing data" investigation.
  4. Optional: Create one preSQL file per table in ./projects/<project_name>/presql/ as <table_name>.sql. The preSQL file contains the statements that will be run by the prefixsql.py module to update the source database before extracting data. The statements should ideally be valid DML and DDL SQL queries.
  5. Optional: Create one postSQL file per table in ./projects/<project_name>/postsql/ as <table_name>.sql. The postSQL file contains the statements that will be run by the prefixsql.py module to update the source database after extracting data. The statements should ideally be valid DML and DDL SQL queries.
  6. Once you are happy with the configurations, push the repo to a new branch and submit a merge request.

Using the Pipeline

Single Table

Call spark-submit passing the Python script and specifying the path to the config file for the table you want to run:

spark-submit database_to_gcp.py projects/<project_name>/config/<table_name>.json
Multi Table

You can run multiple tables by specifying the path to the config file for each table you want to run

spark-submit database_to_gcp.py projects/<project_name>/config/<table_name>.json projects/<project_name>/config/<table_name>.json
Entire Project

You can run all the tables in a project by specifying "*" as opposed to a single table when providing the path to the config file:

spark-submit database_to_gcp.py projects/<project>/config/*

Note: For the above to work you need a config file for the table/s as well as a sql file to fetch the data.

Anonymizing Data:

The data is anonymized using BigQuery. When the data gets anonymized two tables are created. The <table>_anon table only contains the anonymized fields and the <table>_anonmap contains both the anonymized and original fields. The table containing both can be used to cross-reference values to make sure the dashboards are working as intended. - To anonymise the data, we use FARM_FINGERPRINT which returns a value that cannot be reverse-engineered to the original value and it always returns the same fingerprint for a specific value in the dataset. Which makes it possible to join two separate tables even after the anonymisation.

Template of Anonymisation:

CREATE OR REPLACE TABLE `<project>.<dataset_lookups>.<table>_anonmap` AS
  WITH ORIGINAL AS
  (
    SELECT *
    FROM `<project>.<staging_dataset>.<table>`
  ),
  ANON AS
  (
    SELECT *,
           FARM_FINGERPRINT(<COLUMN>) AS <COLUMN>_ANON,
           ...
    FROM ORIGINAL
  ),
  MAP AS
  (
    SELECT DISTINCT
           <COLUMN>,
           ...
    FROM ANON
  )
  SELECT *
  FROM MAP

CREATE OR REPLACE TABLE `<project>.<dataset>.<table>_anon` AS
  WITH ORIGINAL AS
  (
    SELECT *
    FROM `bcx-insights.edsa_projects2019.migratedproducts_20191016_00`
  ),
  ANON AS
  (
    SELECT * EXCEPT(SERVICE_NO,BAN,ACCOUNT_NO),
           FARM_FINGERPRINT(SERVICE_NO) AS SERVICE_NO_ANON,
           FARM_FINGERPRINT(BAN) AS BAN_ANON,
           FARM_FINGERPRINT(ACCOUNT_NO) AS ACCOUNT_NO_ANON
    FROM ORIGINAL
  )
  SELECT *
  FROM ANON

Retention

Retention of table data in local storage and on GCP are handled separately. Local retention is handled by DBP for each project_table being run. Remote Retention is handled by a GCP Cloud Function and a Cloud Scheduler Job through a Pub/Sub topic.

The config file must specify the following retention parameters as shown: * local_retention_days - maximum age of locally stored table data * remote_retention_days - maximum age of remote (GCS) table data * remote_retention_tables - minimum number of remote tables to keep regardless of retention days

Note: * If no values are specified for local or remote retention then all respective tables and files will be kept until manually deleted. * Tables and table data older than [current date - retention_days] will be permanently deleted. * The number of remote_retention_tables takes precedence over remote_retention_days. * At least one local table data file set will be retained.

{
    "source": {
        "jdbc_uri": "jdbc:oracle:thin:@//<db_url>:1525/<oracle_db_name>",
        "user": "<source_db_username>",
        "password": "<source_db_password>",
        "change_approved": 0,
        "presql_lim_hrs": 0,
        "local_retention_days": 7,
        "remote_retention_days": 30,
        "remote_retention_tables": 10
    },
    "dest": [
        {
            "db_type": "bigquery"
        }
    ]
}

Local Retention

When DBP is run for a project_table the local retention parameter is used to remove local versions of trigger and parquet files that are older than the retention date.

retention date = current date - retention days

Remote Retention

Tables in GCP BigQuery and the related schema, trigger and parquet files in Storage older than a retention period are deleted. A minimum number of tables and files per project_table is specified along with the retention period in the config file.

The GCP cleaning occurs in three steps: 1. Cloud Scheduler runs the gcp-clean job every day at 17:00. 2. The Pub/Sub topic trigger-dbp-gcs-and-bq-clean gets invoked by the Cloud Scheduler job gcp-clean. 3. The Cloud Function dbp_gcs_and_bq_clean gets triggered by the Pub/Sub topic trigger-dbp-gcs-and-bq-clean.

To set up the three components, run the dbp_gcs_and_bq_clean_setup.sh script with the command:

sudo bash gcpsetup/dbp_gcs_and_bq_clean_setup.sh

You will be required to follow the authorization link, select/enter your Telkom email details, copy the verification code generated and paste it in the terminal.

The console commands used by the dbp_gcs_and_bq_clean_setup.sh script to create the GCP cleaning components are:

  1. Create the Pub/Sub topic shell gcloud pubsub topics create trigger-dbp-gcs-and-bq-clean

    • trigger-dbp-gcs-and-bq-clean is the Pub/Sub topic name
  2. Deploy the Cloud Function

    • Change directory to the dbp_gcs_and_bq_clean folder shell cd ./gcpsetup/dbp_gcs_and_bq_clean/
  3. Deploy the function shell gcloud functions deploy dbp_gcs_and_bq_clean --region=europe-west2 --runtime=python37 --trigger-topic=trigger-dbp-gcs-and-bq-clean * dbp_gcs_and_bq_clean is the name of the function to call when the Cloud Function is triggered * europe-west2 is the region we want to deploy to (anything in EU is fine) * python37 indicates which environment the function requires * trigger-dbp-gcs-and-bq-clean is the PubSub topic the function will subscribe to (Writing a Cloud Function triggered by PubSub)

  4. Create the Cloud Scheduler job shell gcloud scheduler jobs create pubsub gcp-clean --schedule="0 17 * * *" --topic=trigger-dbp-gcs-and-bq-clean --message-body="Run dbp_gcs_and_bq_clean cloud function through pub/sub." --description="Starts the dbp_gcs_and_bq_clean cloud function every day at 17:00" --time-zone="Africa/Johannesburg"

    • gcp-clean is the name of the JOB
    • "0 17 * * *" is the unix-cron style frequency: every day at 17:00
    • trigger-dbp-gcs-and-bq-clean is the Pub/Sub topic to be invoked
    • "Africa/Johannesburg" is the time-zone for the schedule time