Database Pipeline
Contents
About
The database pipeline allows us to copy data from one "database" to another. It primarily exists to extract data from Telkom's EDW environment (Oracle). Tools are available to refresh the tables in the source database before pulling data, as well as retention features for BigQuery and GCS. The pipeline currently supports the following: * Sources * Oracle via OJDBC * MySQL via OJDBC * Destinations * BigQuery via uploading a Parquet to GCS
Architecture

The pipeline uses Apache Sqoop to connect to the source and download the data, Spark to process the data on the local machine, and GCS to upload the resulting Parquet file where once loaded it is automatically ingested into BigQuery according to the specified configurations.

Detailed description of database_to_gcp.py
When running database_to_gcp.py it does the following:
1. The program first checks to see if it should refresh data in the source database. It looks for a <table_name>.sql preSQL script in the <project_name>/presql/ folder. If the script exists, the program executes the queries in the script, otherwise the program moves on. A presql_lim_hrs flag can be added to config.json to indicate the time in hours DBP should wait before running the preSQL script again. This is useful for those big preSQL scrippts you dont want to acciently run if the pipeline breaks downstream. To set the preSQL time limit, includ the following under source in config.json:
json
"presql_lim_hrs": 0
When a preSQL script is available, preSQL will always execute if presql_lim_hrs is not specified in the config file or set to 0.
- The program checks the schema of the source table against a previously saved schema to see if they match.
- If there is no previous schema, the schema checker will allow the program to continue.
- If a
change_approvedflag has been added to thesourcekey in the json and is set to 1, it will allow the program to continue regardless of changes to the schema.json "change_approved": 0 - If the above conditions are not met and the schema does not match the previous schema, the program will stop.
- If the schema is approved, the program uses Sqoop to download the data from Oracle to a local Parquet file.
- Once the download is complete, the program uses a Spark JDBC connection to pull the correct data types from the Oracle table and then converts the Parquet data types to those types instead.
- The program uploads the parquet files and trigger file to Google Cloud Storage.
- Lastly the program checks if a postSQL script
<table_name>.sqlfor the source database exists in the<project_name>/postsql/folder. If the script exists and if preSQL was executed, the program executes the queries in the postSQL script, otherwise the program moves on.
When the parquet file is loaded into GCS: 1. GCS's notification system sends a message to the PubSub topic containing all the details of where and what file has been uploaded. 2. PubSub has been configured to run a cloud function when the topic is triggered and passes on the details to the function. 3. The function then takes those details and uses it to configure the BigQuery API that tells BigQuery to use the parquet to create a table.
Motivation
For each step/each tool used, with alternatives considered and problems we ran into that resulted in our final choice
Why check for a schema change?
So that we are made aware of any changes to the table. This also insures that the dashboards and models don't break when a column is renamed, added, removed or changed to a different data type.
Why use Sqoop to download the data and not the Spark JDBC connector?
Spark JDBC is very slow and takes a lot of configuring to compare with the download speed of Sqoop (Which is made for downloading data from relational databases). Further more Sqoop can also be optimized specifically for the table to increase download speeds.
Why are you using the Spark JDBC connector to get the data types?
Sqoop changes the data types to improve the download speed. Which means they do not match the source tables original data types. This ensures we will keep the same data types as the Source tables.
Why do you need a trigger file?
When downloading via Sqoop, multiple Parquet files are generated in a folder. Cloud Storage notifications trigger once per file upload and uploading multiple files would cause multiple triggers. To get around this, we instead configure the notification to wait for a trigger file. This trigger file also allows us to pass additional information to the Cloud Function that gets triggered.
First Time Setup
Note, when using your user account for GCP uploads (If you don't have access to the service account file you use your user account):
The pydata-google-auth function will fail inside the program. You need to run the function once outside of the program for it to work inside the program.
Install pydata_google_auth:
sudo pip3 install pydata_google_auth
Type python3 in your terminal and hit enter, then copy past the following:
import pydata_google_auth
pydata_google_auth.get_user_credentials(['https://www.googleapis.com/auth/cloud-platform'])
The function requires you to log into your Google account.
General (All sources and destinations)
- Install the GCloud SDK and set it up with your user account
- Create the PubSub topic:
shell gcloud pubsub topics create trigger-database-pipeline- Where
trigger-database-pipelineis the name of the topic we want to create
- Where
- Deploy the Cloud Function
- Change directory to the
cloud_functionfoldershell cd ./gcpsetup/cloud_function/
- Change directory to the
- Deploy the function
shell gcloud functions deploy ingest_database_pipeline --region europe-west2 --runtime python37 --trigger-topic trigger-database-pipeline*ingest_database_pipelineis the name of the function to call when the Cloud Function is triggered *europe-west2is the region we want to deploy to (anything in EU is fine) *python37indicates which environment the function requires *trigger-database-pipelineis the PubSub topic the function will subscribe to (Writing a Cloud Function triggered by PubSub) - The ingest_database_pipeline_workflow.yaml file in the repo:
- Is used to setup a Data Proc cluster. In our case we setup the cluster to run a job and then close again. In the config file you need to specify the database type and the connection details to connect to the database.
- The trigger-database-pipeline.py file in the repo:
- Is the code used to upload the data to any database that has JDBC connector. This file needs to be in cloud storage in the location specified in the .yaml file above (or you need to change the location).
Config file examples
BigQuery as destination, Oracle as source
{
"source": {
"jdbc_uri": "jdbc:oracle:thin:@//<db_url>:1525/<oracle_db_name>",
"user": "<source_db_username>",
"password": "<source_db_password>",
"change_approved": 0,
"presql_lim_hrs": 0,
"local_retention_days": 7,
"remote_retention_days": 30,
"remote_retention_tables": 10
},
"dest": [
{
"db_type": "bigquery"
}
]
}
BigQuery as destination, MySQL as source
{
"source": {
"jdbc_uri": "jdbc:mysql://<db_url>/<mysql_db_name>",
"user": "<source_db_username>",
"password": "<source_db_password>",
"change_approved": 0,
"presql_lim_hrs": 0,
"local_retention_days": 7,
"remote_retention_days": 30,
"remote_retention_tables": 10
},
"dest": [
{
"db_type": "bigquery"
}
]
}
Postgres as destination, Oracle as source
{
"source": {
"jdbc_uri": "jdbc:oracle:thin:@//<db_url>:1525/<oracle_db_name>",
"user": "<source_db_username>",
"password": "<source_db_password>",
"change_approved": 0,
"presql_lim_hrs": 0,
"local_retention_days": 7
},
"dest": [
{
"db_type": "postgres",
"ip": "",
"user": "",
"password": ""
}
]
}
Setting up a new Pipeline
Prerequisites
- A PubSub topic to trigger on file upload
- The
ingest_database_pipelinefunction deployed to Cloud Functions - Spark, Sqoop, GCloud SDK installed in WSL. Use the Engineering Env script from the "Dev Environments" repo to get the environment setup
- A chosen
that will be used to house the config and name the destination bucket/dataset/database. The pipeline will use the folder you create in ./projects/to infer the project name. - Chosen
/s that will be the descriptive part of the name/s in destination database. The pipeline will use the config/s file you create in ./projects/<project_name>/config/to infer the the table name/s. - Source database requirements:
Setup Process
BigQuery as the Destination
If GCS is the final destination, only do step 1 and then move on to Local Configurations
1. Create a GCS bucket named
gsutil notification create -t trigger-database-pipeline -f json -e OBJECT_FINALIZE -p auto_data/trigger/ gs://<project_name>
The options are configured as follows:
- -t The PubSub topic to notify, in our case always trigger-database-pipeline
- -f The format that we want the notification to be in for PubSub, always json
- -e The event type filter, by default it send notifications on any object action, we only want to notify when a file is added or overwritten, therefore always OBJECT_FINALIZE.
- -p The folder to watch, notifications will only be sent for objects that have this prefix, always auto_data/trigger/
- gs://<bucket_id> The bucket you setup for this pipeline
Local Configurations
- Create the following directories
./projects/<project_name>./projects/<project_name>/config./projects/<project_name>/sql./projects/<project_name>/presql(optional)./projects/<project_name>/postsql(optional)
- Create one config file per table in
./projects/<project_name>/config/as<table_name>.json. This file contains the various configurations for required to inform the pipeline of the source and destination (see examples). - Create one SQL file per table in
./projects/<project_name>/sql/as<table_name>.sql. The SQL file contains the statement that will be run by the pipeline to extract the data from the source database. This should ideally be aSELECT *for simplicity's sake, to avoid a "missing data" investigation. - Optional: Create one preSQL file per table in
./projects/<project_name>/presql/as<table_name>.sql. The preSQL file contains the statements that will be run by theprefixsql.pymodule to update the source databasebeforeextracting data. The statements should ideally be valid DML and DDL SQL queries. - Optional: Create one postSQL file per table in
./projects/<project_name>/postsql/as<table_name>.sql. The postSQL file contains the statements that will be run by theprefixsql.pymodule to update the source databaseafterextracting data. The statements should ideally be valid DML and DDL SQL queries. - Once you are happy with the configurations, push the repo to a new branch and submit a merge request.
Using the Pipeline
Single Table
Call spark-submit passing the Python script and specifying the path to the config file for the table you want to run:
spark-submit database_to_gcp.py projects/<project_name>/config/<table_name>.json
Multi Table
You can run multiple tables by specifying the path to the config file for each table you want to run
spark-submit database_to_gcp.py projects/<project_name>/config/<table_name>.json projects/<project_name>/config/<table_name>.json
Entire Project
You can run all the tables in a project by specifying "*" as opposed to a single table when providing the path to the config file:
spark-submit database_to_gcp.py projects/<project>/config/*
Note: For the above to work you need a config file for the table/s as well as a sql file to fetch the data.
Anonymizing Data:
The data is anonymized using BigQuery. When the data gets anonymized two tables are created. The <table>_anon table only contains the anonymized fields and the <table>_anonmap contains both the anonymized and original fields. The table containing both can be used to cross-reference values to make sure the dashboards are working as intended.
- To anonymise the data, we use FARM_FINGERPRINT which returns a value that cannot be reverse-engineered to the original value and it always returns the same fingerprint for a specific value in the dataset. Which makes it possible to join two separate tables even after the anonymisation.
Template of Anonymisation:
CREATE OR REPLACE TABLE `<project>.<dataset_lookups>.<table>_anonmap` AS
WITH ORIGINAL AS
(
SELECT *
FROM `<project>.<staging_dataset>.<table>`
),
ANON AS
(
SELECT *,
FARM_FINGERPRINT(<COLUMN>) AS <COLUMN>_ANON,
...
FROM ORIGINAL
),
MAP AS
(
SELECT DISTINCT
<COLUMN>,
...
FROM ANON
)
SELECT *
FROM MAP
CREATE OR REPLACE TABLE `<project>.<dataset>.<table>_anon` AS
WITH ORIGINAL AS
(
SELECT *
FROM `bcx-insights.edsa_projects2019.migratedproducts_20191016_00`
),
ANON AS
(
SELECT * EXCEPT(SERVICE_NO,BAN,ACCOUNT_NO),
FARM_FINGERPRINT(SERVICE_NO) AS SERVICE_NO_ANON,
FARM_FINGERPRINT(BAN) AS BAN_ANON,
FARM_FINGERPRINT(ACCOUNT_NO) AS ACCOUNT_NO_ANON
FROM ORIGINAL
)
SELECT *
FROM ANON
Retention
Retention of table data in local storage and on GCP are handled separately. Local retention is handled by DBP for each project_table being run. Remote Retention is handled by a GCP Cloud Function and a Cloud Scheduler Job through a Pub/Sub topic.
The config file must specify the following retention parameters as shown:
* local_retention_days - maximum age of locally stored table data
* remote_retention_days - maximum age of remote (GCS) table data
* remote_retention_tables - minimum number of remote tables to keep regardless of retention days
Note:
* If no values are specified for local or remote retention then all respective tables and files will be kept until manually deleted.
* Tables and table data older than [current date - retention_days] will be permanently deleted.
* The number of remote_retention_tables takes precedence over remote_retention_days.
* At least one local table data file set will be retained.
{
"source": {
"jdbc_uri": "jdbc:oracle:thin:@//<db_url>:1525/<oracle_db_name>",
"user": "<source_db_username>",
"password": "<source_db_password>",
"change_approved": 0,
"presql_lim_hrs": 0,
"local_retention_days": 7,
"remote_retention_days": 30,
"remote_retention_tables": 10
},
"dest": [
{
"db_type": "bigquery"
}
]
}
Local Retention
When DBP is run for a project_table the local retention parameter is used to remove local versions of trigger and parquet files that are older than the retention date.
retention date = current date - retention days
Remote Retention
Tables in GCP BigQuery and the related schema, trigger and parquet files in Storage older than a retention period are deleted. A minimum number of tables and files per project_table is specified along with the retention period in the config file.
The GCP cleaning occurs in three steps:
1. Cloud Scheduler runs the gcp-clean job every day at 17:00.
2. The Pub/Sub topic trigger-dbp-gcs-and-bq-clean gets invoked by the Cloud Scheduler job gcp-clean.
3. The Cloud Function dbp_gcs_and_bq_clean gets triggered by the Pub/Sub topic trigger-dbp-gcs-and-bq-clean.
To set up the three components, run the dbp_gcs_and_bq_clean_setup.sh script with the command:
sudo bash gcpsetup/dbp_gcs_and_bq_clean_setup.sh
You will be required to follow the authorization link, select/enter your Telkom email details, copy the verification code generated and paste it in the terminal.
The console commands used by the dbp_gcs_and_bq_clean_setup.sh script to create the GCP cleaning components are:
-
Create the Pub/Sub topic
shell gcloud pubsub topics create trigger-dbp-gcs-and-bq-cleantrigger-dbp-gcs-and-bq-cleanis the Pub/Sub topic name
-
- Change directory to the
dbp_gcs_and_bq_cleanfoldershell cd ./gcpsetup/dbp_gcs_and_bq_clean/
- Change directory to the
-
Deploy the function
shell gcloud functions deploy dbp_gcs_and_bq_clean --region=europe-west2 --runtime=python37 --trigger-topic=trigger-dbp-gcs-and-bq-clean*dbp_gcs_and_bq_cleanis the name of the function to call when the Cloud Function is triggered *europe-west2is the region we want to deploy to (anything in EU is fine) *python37indicates which environment the function requires *trigger-dbp-gcs-and-bq-cleanis the PubSub topic the function will subscribe to (Writing a Cloud Function triggered by PubSub) -
Create the Cloud Scheduler job
shell gcloud scheduler jobs create pubsub gcp-clean --schedule="0 17 * * *" --topic=trigger-dbp-gcs-and-bq-clean --message-body="Run dbp_gcs_and_bq_clean cloud function through pub/sub." --description="Starts the dbp_gcs_and_bq_clean cloud function every day at 17:00" --time-zone="Africa/Johannesburg"gcp-cleanis the name of the JOB"0 17 * * *"is the unix-cron style frequency: every day at 17:00trigger-dbp-gcs-and-bq-cleanis the Pub/Sub topic to be invoked"Africa/Johannesburg"is the time-zone for the schedule time