Найти в Дзене
Антон Поляков

Reconciliation of two data tables in Hadoop Hive

Оглавление

What is Data Reconciliation?

Data reconciliation (DR) is a term typically used to describe a verification phase during a data migration where the target data is compared against original source data to ensure that the migration architecture has transferred the data correctly.

During data migration, it is possible for mistakes to be made in the mapping and transformation logic. Also, runtime failures such as network dropouts or broken transactions can lead to data being left in an invalid state. These problems can lead to a range of issues such as:

  • Missing records
  • Missing values
  • Incorrect values
  • Duplicated records
  • Badly formatted values
  • Broken relationships across tables or systems

Reconciliation criteria

  • Keep checks as simple as possible
  • The result should be presented as a list (table) of "bad" data in the tables (files) we checked.
  • The decision to fix bad data should be made as a result of the work of this script.
  • All checks should perform very fast

The structure of the reconciliation project

The project is a spark-submit job

Build package.zip

To make this project work there is a need to gather all dependencies together in one file that will be sent to all cluster nodes with --py-files argument

bash calcDQ/build.sh

Run a job

src - source table (file) dst - destination table (file) j - JSON config

spark-submit --master yarn \
--py-files recon/package.zip \
recon/run.py -src sandbox_apolyakov.recon_source -dst sandbox_apolyakov.recon_dest -j recon/table_name_recon_config.json

Recon package checks

policies.validation.int_validator - args: tolerance: Performs the comparison between int type values with the given tolerance (optional)

policies.validation.string_validator - Performs the comparison between string type values

policies.validation.date_validator - Performs the comparison between date type values

Package file structure

-2

Entry point - run.py

  • build.sh - Execute everytime on policies, src folders update
  • table_name_recon_config.jsonConfig - file to compare two data sets
  • policies.validation - Validations
  • src.recon - Application source code

An example of a configuration json file table_name_recon_config.json

{
"key": "id",
"col_1": {
"dst_column_name": "name",
"src_column_name": "name",
"validation": {
"properties": {},
"validationMethod": "string_validator"
}
},
"col_2": {
"dst_column_name": "age",
"src_column_name": "vozrast",
"validation": {
"properties": {
"tolerance": 0.1
},
"validationMethod": "int_validator"
}
},
"col_3": {
"dst_column_name": "day",
"src_column_name": "day",
"validation": {
"properties": {},
"validationMethod": "date_validator"
}
}
}

  • key - The unique name of an ID (identity) column. It should be named similar in both tables.
  • col_* - The placeholder to store data for every column checked
  • dst_column_name - The column name in destination table (file)
  • src_column_name - The column name in source table (file)
  • validation - Validation options for a column
  • properties - Validation parameters
  • validationMethod - Validation method

How it works

  1. Get data from tables we have got in script arguments src and dst
  2. Validation politics are applied to every column of every row
  3. The data that did not pass the validation is stored into dataframe that can be written into any database or file later on
Reconciliation process does not affect data ingest and data integration processes. It works in parallel with existing processes and helps engineers to measure a quality of data ingestion processes

Init code

Init test data for Hive

drop table if exists sandbox_apolyakov.recon_source;
create table sandbox_apolyakov.recon_source (
id int,
name string,
vozrast int,
day date
);
insert into sandbox_apolyakov.recon_source values
(1, 'Dow', 33, '2018-07-13'),
(2, 'Pits', 122, '1917-04-12'),
(3, 'Chris', 38, '2001-05-26'),
(4, 'James', 21, '2001-05-26'),
(5, 'Penelopa', 38, '2001-05-30');

drop table if exists sandbox_apolyakov.recon_dest;
create table sandbox_apolyakov.recon_dest (
id int,
name string,
age int,
day date
);
insert into sandbox_apolyakov.recon_dest values
(1, 'Dow', 33, '2018-07-13'), -- valid
(2, 'Pits', 120, '1917-04-10'), -- invalid age (122 -> 120) and date (1917-04-12 -> 1917-04-10)
(3, 'Charles', 38, '2001-05-26'), -- invalid name (Chris -> Charles)
(4, 'James', 200, '2001-05-26'), -- invalid age (21 -> 200)
(55, 'Penelopa', 38, '2001-05-10') -- invalid date (2001-05-30 -> 2001-05-10) and Index
;
Tests was performed on Hadoop Hortonworks 2.6.5, Spark 2.3.1, Python 2.7

Example

An example of a full join operation with the given src and dst data sources

The results of a job
The results of a job

There is a new row is created per every invalid column of every row.

If one row with 3 invalid columns is reconciled - the result will have 3 rows with the description of every invalid check between src and dst tables (files)
-4

Code and README.md you can find on github