Bigmetadata ETL Docs¶
All data for CARTO’s Data Observatory is obtained through tasks built subclassing Bigmetadata ETL classes.
The classes themselves are derived from Luigi tasks.
By performing the ETL using these classes, we gain a few guarantees:
- Reproduceability, and avoidance of duplicate work
- Generation of high-quality metadata consumable by the Observatory API
- Scalability across multiple processes
Contents:
Quickstart¶
Requirements¶
You’ll need:
- git
- docker 1.13.1+
- docker-compose 1.18.0+
You should also install make to get access to convenience commands, if you don’t have it already.
You’ll want at least 2GB of memory available on the host machine.
You’ll want at least 30GB of disk space available on the host machine to work comfortably with data and get everything running. If you want to install an existing database dump, you will need more like 120GB of space. If you want to install, say, the entire American Community Survey, you will want more like 1TB of space.
Clone & configure¶
Once your prerequisites are set up, clone the repo:
git clone https://github.com/cartodb/bigmetadata.git
cd bigmetadata
touch .env
The last line sets up an empty conviguration. If you want to upload to your
CARTO account from the ETL, you’ll then need to configure CARTODB_API_KEY
and CARTODB_URL
in the .env
file.
If you’re on Linux instead of Mac, you may want to give your existing user docker (which is equivalent to root) privileges:
sudo gpasswd -a $(whoami) docker
Then log out, and log in.
Start¶
Before running tasks the first time, you’ll need to download and start the containers.
docker-compose up -d
Once the containers are up, you need to confirm that the Postgres container has started.
make psql
This will attempt to launch into an interactive session with the container Postgres. If it doesn’t work, wait a little bit and try again. The database takes some time to get running initially.
Run¶
You now should be able to run a task.
make -- run es.ine.FiveYearPopulation
Note
The first time you run it, that command will download a few Docker images. Depending on the speed of your connection, it could take ten or fifteen minutes. Grab a coffee!
That will run FiveYearPopulation
. This includes downloading
all the source data files if they don’t already exist locally, and generating
all the metadata necessary to make this dataset work with
observatory-extension
functions.
You can take a look at the data:
make psql
gis=# select count(*) from observatory.obs_column;
count
-------
169
(1 row)
gis=# select id, name, type, aggregate from observatory.obs_column where name ilike 'population%';
id | name | type | aggregate
-------------------------+----------------------------+---------+-----------
es.ine.pop_0_4 | Population age 0 to 4 | Numeric | sum
es.ine.pop_5_9 | Population age 5 to 9 | Numeric | sum
es.ine.pop_10_14 | Population age 10 to 14 | Numeric | sum
es.ine.pop_15_19 | Population age 15 to 19 | Numeric | sum
es.ine.pop_20_24 | Population age 20 to 24 | Numeric | sum
es.ine.pop_25_29 | Population age 25 to 29 | Numeric | sum
es.ine.pop_30_34 | Population age 30 to 34 | Numeric | sum
es.ine.pop_35_39 | Population age 35 to 39 | Numeric | sum
es.ine.pop_40_44 | Population age 40 to 44 | Numeric | sum
es.ine.pop_45_49 | Population age 45 to 49 | Numeric | sum
es.ine.pop_50_54 | Population age 50 to 54 | Numeric | sum
es.ine.pop_55_59 | Population age 55 to 59 | Numeric | sum
es.ine.pop_60_64 | Population age 60 to 64 | Numeric | sum
es.ine.pop_65_69 | Population age 65 to 69 | Numeric | sum
es.ine.pop_70_74 | Population age 70 to 74 | Numeric | sum
es.ine.pop_75_79 | Population age 75 to 79 | Numeric | sum
es.ine.pop_80_84 | Population age 80 to 84 | Numeric | sum
es.ine.pop_85_89 | Population age 85 to 89 | Numeric | sum
es.ine.pop_90_94 | Population age 90 to 94 | Numeric | sum
es.ine.pop_95_99 | Population age 95 to 99 | Numeric | sum
es.ine.pop_100_more | Population age 100 or more | Numeric | sum
(21 rows)
gis=# select * from observatory.obs_column_to_column where source_id in (select id from observatory.obs_column where name ilike 'population%');
source_id | target_id | reltype
-------------------------+-------------+-------------
es.ine.pop_0_4 | es.ine.t1_1 | denominator
es.ine.pop_5_9 | es.ine.t1_1 | denominator
es.ine.pop_10_14 | es.ine.t1_1 | denominator
es.ine.pop_15_19 | es.ine.t1_1 | denominator
es.ine.pop_20_24 | es.ine.t1_1 | denominator
es.ine.pop_25_29 | es.ine.t1_1 | denominator
es.ine.pop_30_34 | es.ine.t1_1 | denominator
es.ine.pop_35_39 | es.ine.t1_1 | denominator
es.ine.pop_40_44 | es.ine.t1_1 | denominator
es.ine.pop_45_49 | es.ine.t1_1 | denominator
es.ine.pop_50_54 | es.ine.t1_1 | denominator
es.ine.pop_55_59 | es.ine.t1_1 | denominator
es.ine.pop_60_64 | es.ine.t1_1 | denominator
es.ine.pop_65_69 | es.ine.t1_1 | denominator
es.ine.pop_70_74 | es.ine.t1_1 | denominator
es.ine.pop_75_79 | es.ine.t1_1 | denominator
es.ine.pop_80_84 | es.ine.t1_1 | denominator
es.ine.pop_85_89 | es.ine.t1_1 | denominator
es.ine.pop_90_94 | es.ine.t1_1 | denominator
es.ine.pop_95_99 | es.ine.t1_1 | denominator
es.ine.pop_100_more | es.ine.t1_1 | denominator
(21 rows)
gis=# select id, name, type, aggregate from observatory.obs_column where id = 'es.ine.t1_1';
id | name | type | aggregate
-------------+------------------+---------+-----------
es.ine.t1_1 | Total population | Numeric | sum
(1 row)
Example ETL/metadata pipeline¶
This is a quick guide to building a full ETL pipeline, along with associated metadata, for the Data Observatory.
As an example, we will bring in the Quarterly Census of Employment and Wages (QCEW), a product of the Bureau of Labor Statistics. This dataset tracks the number of employees, firms, and average wages across the full gamut of North American Industry Classification System (NAICS) industries.
QCEW is, of course, a quarterly release, and counties are the smallest geography considered.
The process of building a Python module to bring a new dataset into the Data Observatory can be broadly divided into six steps:
We use Luigi to isolate
each step into a Task
. A Task
has well-defined inputs (other
tasks) and outputs (files, tables on disk, etc.) In a nutshell:
- a task cannot be run if it is complete
- if all of a
Task
‘s outputs exist, then it is complete - in order to run, all of a
Task
‘s requirements must be complete
Each of the steps except (1) corresponds to a Task
.
The actual flow of Task
dependencies could be charted like this:
Name | Description |
---|---|
Download data | DownloadUnzipTask , Task |
Import data | CSV2TempTableTask , Shp2TempTableTask , TempTableTask |
Preprocess data | TempTableTask |
Write metadata | ColumnsTask |
Output table | TableTask |
Where each step should be a Task
subclassed from the noted Bigmetadata
utility class.
We use a set of utility classes to avoid writing repetitive code.
To get started, make sure you’re running the IPython notebook container.
docker-compose up -d ipython
Then, get the port of the running IPython notebook container:
make ps
And navigate to it in your browser.
1. Import libraries¶
# Import a test runner
from tests.util import runtask
# We'll need these basic utility classes and methods
from tasks.util import underscore_slugify, shell, classpath
from tasks.base_tasks import (TempTableTask, TableTask, ColumnsTask,
DownloadUnzipTask, CSV2TempTableTask)
from tasks.meta import current_session, DENOMINATOR
# We like OrderedDict because it makes it easy to pass dicts
# like {column name : column definition, ..} where order still
# can matter in SQL
from collections import OrderedDict
from luigi import IntParameter, Parameter
import os
# These imports are useful for checking the database
from tasks.meta import OBSTable, OBSColumn, OBSTag
# We'll also want these tags for metadata
from tasks.tags import SectionTags, SubsectionTags, UnitTags
2. Download the data¶
The first step of most ETLs is going to be downloading the source and saving it to a temporary folder.
DownloadUnzipTask
is a utility class that handles the file naming
and unzipping of the temporary output for you. You just have to write
the code which will do the download to the output file name.
class DownloadQCEW(DownloadUnzipTask):
year = IntParameter()
URL = 'http://www.bls.gov/cew/data/files/{year}/csv/{year}_qtrly_singlefile.zip'
def download(self):
shell('wget -O {output}.zip {url}'.format(
output=self.output().path,
url=self.URL.format(year=self.year)
))
Within the IPython environment, we can create and run the task within a sandbox.
We have to specify the year, since it’s specified as a parameter without a default.
download_task = DownloadQCEW(year=2014)
runtask(download_task)
Provided the output folder exists, the DownloadQCEW
task for 2014
will not run again.
download_task.output().path
'tmp/tmp/DownloadQCEW_2014_cfabf27024'
download_task.output().exists()
True
3. Import data into PostgreSQL¶
A lot of processing can be done in PostgreSQL quite easily. We have utility classes to more easily bring both Shapefiles and CSVs into PostgreSQL.
For CSV2TempTableTask
, we only have to define an input_csv
method that will return a path (or iterable of paths) to the CSV(s). The
header row will automatically be checked and used to construct a schema
to bring the data in.
The standard requires
method of Luigi is used here, too. This
requires that the DownloadQCEW
task for the same year must be run
beforehand; the output
from that task is now accessible as the
input
of this one.
class RawQCEW(CSV2TempTableTask):
year = IntParameter()
def requires(self):
return DownloadQCEW(year=self.year)
def input_csv(self):
return os.path.join(self.input().path,'{}.q1-q4.singlefile.csv'.format(self.year))
Run the task. If the table exists and has more than 0 rows, it will not be run again.
current_session().rollback()
raw_task = RawQCEW(year=2014)
runtask(raw_task)
Confirm the task has completed successfully.
raw_task.complete()
True
Session can be used to execute raw queries against the table.
The output of a TempTableTask
can be queried directly by using its
table
method, which is a string with the fully schema-qualified
table name. We are guaranteed names that are unique to the
module/task/parameters without having to come up with any names
manually.
raw_task.output().table
'"tmp".RawQCEW_2014_cfabf27024'
session = current_session()
resp = session.execute('select count(*) from {}'.format(raw_task.output().table))
resp.fetchall()
[(14276508L,)]
4. Preprocess data in PostgreSQL¶
QCEW data has a lot of rows we don’t actually need – these can be filtered out in SQL easily.
For QCEW, the download files are annual, but contain quarterly time
periods. Output tables should be limited to a single point in time.
We’re also only interested in private employment (own_code = '5'
)
and county level aggregation by total (71), supersector (73), and NAICS
sector (74).
class SimpleQCEW(TempTableTask):
year = IntParameter()
qtr = IntParameter()
def requires(self):
return RawQCEW(year=self.year)
def run(self):
session = current_session()
session.execute("CREATE TABLE {output} AS "
"SELECT * FROM {input} "
"WHERE agglvl_code IN ('74', '73', '71') "
" AND year = '{year}' "
" AND qtr = '{qtr}' "
" AND own_code = '5' ".format(
input=self.input().table,
output=self.output().table,
year=self.year,
qtr=self.qtr,
))
Run the task and confirm it completed. We don’t have to run each step as we write it, as requirements guarantee anything required will be run.
simple_task = SimpleQCEW(year=2014, qtr=4)
runtask(simple_task)
simple_task.complete()
True
simple_task.output().table
'"tmp".SimpleQCEW_4_2014_79152e4934'
resp = session.execute('select count(*) from {}'.format(simple_task.output().table))
resp.fetchall()
[(97167L,)]
5. Write metadata¶
We have to create metadata for the measures we’re interested in from QCEW. Often metadata don’t take parameters, but this one is, since we have to reorganize the table from one row per NAICS code to one column per NAICS code, which is easiest done programmatically.
The ColumnsTask
provides a structure for generating metadata. The
only required method is columns
. What must be returned from that
method is an OrderedDict
whose values are all OBSColumn
and
whose keys are all strings. The keys may be used as human-readable
column names in tables based off this metadata, although that is not
always the case. If the id
of the OBSColumn
is left blank, the
dict’s key will be used to generate it (qualified by the module).
Also, conventionally there will be a requires
method that brings in
our standard tags: SectionTags
, SubsectionTags
, and
UnitTags
. This is an example of defining several tasks as
prerequisites: the outputs of those tasks will be accessible via
self.input()[<key>]
in other methods.
from tasks.us.naics import (NAICS_CODES, is_supersector, is_sector,
get_parent_code)
class QCEWColumns(ColumnsTask):
naics_code = Parameter()
def requires(self):
requirements = {
'sections': SectionTags(),
'subsections': SubsectionTags(),
'units': UnitTags(),
}
parent_code = get_parent_code(self.naics_code)
if parent_code:
requirements['parent'] = QCEWColumns(naics_code=parent_code)
return requirements
def columns(self):
cols = OrderedDict()
code, name, description = self.naics_code, NAICS_CODES[self.naics_code], ''
# This gives us easier access to the tags we defined as dependencies
input_ = self.input()
units = input_['units']
sections = input_['sections']
subsections = input_['subsections']
parent = input_.get('parent')
cols['avg_wkly_wage'] = OBSColumn(
# Make sure the column ID is unique within this module
# If left blank, will be taken from this column's key in the output OrderedDict
id=underscore_slugify(u'avg_wkly_wage_{}'.format(code)),
# The PostgreSQL type of this column. Generally Numeric for numbers and Text
# for categories.
type='Numeric',
# Human-readable name. Will be used as header in the catalog
name=u'Average weekly wage for {} establishments'.format(name),
# Human-readable description. Will be used as content in the catalog.
description=u'Average weekly wage for a given quarter in the {name} industry (NAICS {code}).'
u'{name} is {description}.'.format(name=name, code=code, description=description),
# Ranking of importance, sometimes used to favor certain measures in auto-selection
# Weight of 0 will hide this column from the user. We generally use between 0 and 10
weight=5,
# How this measure was derived, for example "sum", "median", "average", etc.
# In cases of "sum", this means functions downstream can construct estimates
# for arbitrary geographies
aggregate='average',
# Tags are our way of noting aspects of this measure like its unit, the country
# it's relevant to, and which section(s) of the catalog it should appear in.
tags=[units['money'], sections['united_states'], subsections['income']],
)
cols['qtrly_estabs'] = OBSColumn(
id=underscore_slugify(u'qtrly_estabs_{}'.format(code)),
type='Numeric',
name=u'Establishments in {}'.format(name),
description=u'Count of establishments in a given quarter in the {name} industry (NAICS {code}).'
u'{name} is {description}.'.format(name=name, code=code, description=description),
weight=5,
aggregate='sum',
tags=[units['businesses'], sections['united_states'], subsections['commerce_economy']],
targets={parent['qtrly_estabs']: DENOMINATOR} if parent else {},
)
cols['month3_emplvl'] = OBSColumn(
id=underscore_slugify(u'month3_emplvl_{}'.format(code)),
type='Numeric',
name=u'Employees in {} establishments'.format(name),
description=u'Number of employees in the third month of a given quarter with the {name} '
u'industry (NAICS {code}). {name} is {description}.'.format(
name=name, code=code, description=description),
weight=5,
aggregate='sum',
tags=[units['people'], sections['united_states'], subsections['employment']],
)
cols['lq_avg_wkly_wage'] = OBSColumn(
id=underscore_slugify(u'lq_avg_wkly_wage_{}'.format(code)),
type='Numeric',
name=u'Average weekly wage location quotient for {} establishments'.format(name),
description=u'Location quotient of the average weekly wage for a given quarter relative to '
u'the U.S. (Rounded to the hundredths place) within the {name} industry (NAICS {code}).'
u'{name} is {description}.'.format(name=name, code=code, description=description),
weight=3,
aggregate=None,
tags=[units['ratio'], sections['united_states'], subsections['income']],
)
cols['lq_qtrly_estabs'] = OBSColumn(
id=underscore_slugify(u'lq_qtrly_estabs_{}'.format(code)),
type='Numeric',
name=u'Location quotient of establishments in {}'.format(name),
description=u'Location quotient of the quarterly establishment count relative to '
u'the U.S. (Rounded to the hundredths place) within the {name} industry (NAICS {code}).'
u'{name} is {description}.'.format(name=name, code=code, description=description),
weight=3,
aggregate=None,
tags=[units['ratio'], sections['united_states'], subsections['commerce_economy']],
)
cols['lq_month3_emplvl'] = OBSColumn(
id=underscore_slugify(u'lq_month3_emplvl_{}'.format(code)),
type='Numeric',
name=u'Employment level location quotient in {} establishments'.format(name),
description=u'Location quotient of the employment level for the third month of a given quarter '
u'relative to the U.S. (Rounded to the hundredths place) within the {name} '
u'industry (NAICS {code}). {name} is {description}.'.format(
name=name, code=code, description=description),
weight=3,
aggregate=None,
tags=[units['ratio'], sections['united_states'], subsections['employment']],
)
return cols
We should never run metadata tasks on their own – they should be
defined as requirements by TableTask
, below – but it is possible to
do so, as an example.
NAICS code ‘1025’ is the supersector for eduction & health.
education_health_columns = QCEWColumns(naics_code='1025')
runtask(education_health_columns)
education_health_columns.complete()
True
Output from a ColumnsTask
is an OrderedDict
with the columns
wrapped in ColumnTarget
s, which allow us to pass them around
without immediately committing them to the database.
education_health_columns.output()
OrderedDict([('avg_wkly_wage', <tasks.targets.ColumnTarget at 0x7f12a40eead0>),
('qtrly_estabs', <tasks.targets.ColumnTarget at 0x7f12a5831c50>),
('month3_emplvl', <tasks.targets.ColumnTarget at 0x7f12a5831090>),
('lq_avg_wkly_wage', <tasks.targets.ColumnTarget at 0x7f12a4338b10>),
('lq_qtrly_estabs', <tasks.targets.ColumnTarget at 0x7f12a4d1fb50>),
('lq_month3_emplvl',
<tasks.targets.ColumnTarget at 0x7f12a4525390>)])
We can check the OBSColumn
table for evidence that our metadata has
been committed to disk, since we ran the task.
[(col.id, col.name) for col in session.query(OBSColumn)[:5]]
[(u'tmp.avg_wkly_wage_10',
u'Average weekly wage for Total, all industries establishments'),
(u'tmp.qtrly_estabs_10', u'Establishments in Total, all industries'),
(u'tmp.month3_emplvl_10',
u'Employees in Total, all industries establishments'),
(u'tmp.lq_avg_wkly_wage_10',
u'Average weekly wage location quotient for Total, all industries establishments'),
(u'tmp.lq_qtrly_estabs_10',
u'Location quotient of establishments in Total, all industries')]
6. Populate output table¶
Now that we have our data in a format similar to what we’ll need, and
our metadata lined up, we can tie it together with a TableTask
.
Under the hood, TableTask
handles the relational lifting between
columns and actual data, and assigns a hash number to the dataset.
Several methods must be overriden for TableTask
to work:
version()
: a version control number, which is useful for forcing a re-run/overwrite without having to track down and delete output artifacts.table_timespan()
: the timespan (for example, ‘2014’, or ‘2012Q4’) that identifies the date range or point-in-time for this table.columns()
: an OrderedDict of (colname, ColumnTarget) pairs. This should be constructed by pulling the desired columns from requiredColumnsTask
classes.populate()
: a method that should populate (most often via) INSERT the output table.
# Since we have a column ('area_fips') that is a shared reference to
# geometries ('geom_ref') we have to import that column.
from tasks.us.census.tiger import GeoidColumns
class QCEW(TableTask):
year = IntParameter()
qtr = IntParameter()
def version(self):
return 1
def requires(self):
requirements = {
'data': SimpleQCEW(year=self.year, qtr=self.qtr),
'geoid_cols': GeoidColumns(),
'naics': OrderedDict()
}
for naics_code, naics_name in NAICS_CODES.iteritems():
# Only include the more general NAICS codes
if is_supersector(naics_code) or is_sector(naics_code) or naics_code == '10':
requirements['naics'][naics_code] = QCEWColumns(naics_code=naics_code)
return requirements
def table_timespan(self):
return get_timespan('{year}Q{qtr}'.format(year=self.year, qtr=self.qtr))
def columns(self):
# Here we assemble an OrderedDict using our requirements to specify the
# columns that go into this table.
# The column name
input_ = self.input()
cols = OrderedDict([
('area_fips', input_['geoid_cols']['county_geoid'])
])
for naics_code, naics_cols in input_['naics'].iteritems():
for key, coltarget in naics_cols.iteritems():
naics_name = NAICS_CODES[naics_code]
colname = underscore_slugify(u'{}_{}_{}'.format(
key, naics_code, naics_name))
cols[colname] = coltarget
return cols
def populate(self):
# This select statement transforms the input table, taking advantage of our
# new column names.
# The session is automatically committed if there are no errors.
session = current_session()
columns = self.columns()
colnames = columns.keys()
select_colnames = []
for naics_code, naics_columns in self.input()['naics'].iteritems():
for colname, coltarget in naics_columns.iteritems():
select_colnames.append('''MAX(CASE
WHEN industry_code = '{naics_code}' THEN {colname} ELSE NULL
END)::Numeric'''.format(naics_code=naics_code,
colname=colname
))
insert = '''INSERT INTO {output} ({colnames})
SELECT area_fips, {select_colnames}
FROM {input}
GROUP BY area_fips '''.format(
output=self.output().table,
input=self.input()['data'].table,
colnames=', '.join(colnames),
select_colnames=', '.join(select_colnames),
)
session.execute(insert)
On a fresh database, this should return False Will not run if it has been run before for this year & quarter combination.
table_task = QCEW(year=2014, qtr=4)
runtask(table_task)
table_task.complete()
True
The table should exist in metadata, as well as in data, with all relations well-defined.
Unlike the TempTableTask
s above, the output of a TableTask
is
a postgrse table in the observatory
schema, with a unique hash name.
table = table_task.output()
table.table
'observatory.obs_3dc49b70f71ed9bbf5b4a48773c860519af70e1e'
It’s possible for us to peek at the output data.
session.execute('SELECT * FROM {} LIMIT 1'.format(table.table)).fetchall()
[(u'01001', None, None, None, None, None, None, Decimal('395'), Decimal('5'), Decimal('144'), Decimal('0.65'), Decimal('0.52'), Decimal('0.68'), Decimal('609'), Decimal('80'), Decimal('1024'), Decimal('0.96'), Decimal('0.66'), Decimal('0.74'), Decimal('364'), Decimal('68'), Decimal('368'), Decimal('0.79'), Decimal('0.95'), Decimal('1.13'), Decimal('917'), Decimal('3'), Decimal('66'), Decimal('0.68'), Decimal('0.94'), Decimal('1.00'), Decimal('2317'), Decimal('5'), Decimal('103'), Decimal('1.89'), Decimal('3.26'), Decimal('2.45'), Decimal('914'), Decimal('77'), Decimal('426'), Decimal('1.17'), Decimal('1.16'), Decimal('0.90'), Decimal('1231'), Decimal('33'), Decimal('157'), Decimal('1.26'), Decimal('0.60'), Decimal('0.35'), Decimal('925'), Decimal('20'), Decimal('198'), Decimal('1.14'), Decimal('1.66'), Decimal('1.30'), Decimal('914'), Decimal('77'), Decimal('426'), Decimal('1.17'), Decimal('1.16'), Decimal('0.90'), Decimal('1225'), Decimal('30'), Decimal('1347'), Decimal('1.45'), Decimal('1.01'), Decimal('1.44'), Decimal('584'), Decimal('85'), Decimal('1168'), Decimal('0.93'), Decimal('0.65'), Decimal('0.73'), Decimal('904'), Decimal('91'), Decimal('380'), Decimal('0.98'), Decimal('0.61'), Decimal('0.25'), None, None, None, None, None, None, Decimal('433'), Decimal('149'), Decimal('1935'), Decimal('1.13'), Decimal('1.63'), Decimal('1.57'), Decimal('1225'), Decimal('30'), Decimal('1347'), Decimal('1.45'), Decimal('1.01'), Decimal('1.44'), Decimal('274'), Decimal('66'), Decimal('1432'), Decimal('1.10'), Decimal('1.13'), Decimal('1.50'), Decimal('301'), Decimal('8'), Decimal('66'), Decimal('0.53'), Decimal('0.68'), Decimal('0.44'), Decimal('620'), Decimal('15'), Decimal('127'), Decimal('0.97'), Decimal('0.73'), Decimal('0.36'), None, None, None, None, None, None, Decimal('929'), Decimal('17'), Decimal('132'), Decimal('2.13'), Decimal('1.91'), Decimal('1.53'), Decimal('677'), Decimal('768'), Decimal('8173'), Decimal('0.97'), Decimal('0.95'), Decimal('0.91'), Decimal('364'), Decimal('68'), Decimal('368'), Decimal('0.79'), Decimal('0.95'), Decimal('1.13'), Decimal('275'), Decimal('74'), Decimal('1498'), Decimal('0.94'), Decimal('1.06'), Decimal('1.35'), Decimal('584'), Decimal('202'), Decimal('2322'), Decimal('1.01'), Decimal('1.20'), Decimal('1.12'), Decimal('781'), Decimal('114'), Decimal('430'), Decimal('0.70'), Decimal('1.55'), Decimal('0.72'), Decimal('1201'), Decimal('7'), Decimal('36'), Decimal('1.02'), Decimal('0.52'), Decimal('0.17'), Decimal('0'), Decimal('3'), Decimal('0'), Decimal('0'), Decimal('0.57'), Decimal('0'), Decimal('860'), Decimal('62'), Decimal('190'), Decimal('0.69'), Decimal('0.62'), Decimal('0.29'), Decimal('0'), Decimal('26'), Decimal('0'), Decimal('0'), Decimal('0.59'), Decimal('0'), Decimal('1201'), Decimal('7'), Decimal('36'), Decimal('1.02'), Decimal('0.52'), Decimal('0.17'), None, None, None, None, None, None, Decimal('628'), Decimal('41'), Decimal('122'), Decimal('0.87'), Decimal('1.28'), Decimal('0.77'), Decimal('842'), Decimal('73'), Decimal('308'), Decimal('0.67'), Decimal('1.75'), Decimal('0.71'))]
Development¶
Writing ETL tasks is pretty repetitive. In tasks.util
are a
number of functions and classes that are meant to make life easier through
reusability.
Utility Functions¶
These functions are very frequently used within the methods of a new ETL task.
-
tasks.meta.
current_session
()¶ Returns the session relevant to the currently operating
Task
, if any. Outside the context of aTask
, this can still be used for manual session management.
Abstract classes¶
These are the building blocks of the ETL, and should almost always be subclassed from when writing a new process.
Batteries included¶
Data comes in many flavors, but sometimes it comes in the same flavor over and over again. These tasks are meant to take care of the most repetitive aspects.
Running and Re-Running Pieces of the ETL¶
When doing local development, it’s advisable to run small pieces of the ETL
locally to make sure everything works correctly. You can use the make --
run
helper, documented in Run any task. There are several methods for
re-running pieces of the ETL depending on the task and are described below:
Using --force
during development¶
When developing with abstract-classes that offer a force
parameter,
you can use it to re-run a task that has already been run, ignoring and
overwriting all output it has already created. For example, if you have
a tasks.base_tasks.TempTableTask that you’ve modified in the course of
development and need to re-run:
from tasks.base_tasks import TempTableTask
from tasks.meta import current_session
class MyTempTable(TempTableTask):
def run(self):
session = current_session()
session.execute('''
CREATE TABLE {} AS SELECT 'foo' AS mycol;
''')
Running make -- run path.to.module MyTempTable
will only work once, even
after making changes to the run
method.
However, running make -- run path.to.module MyTempTable --force
will force
the task to be run again, dropping and re-creating the output table.
Deleting byproducts to force a re-run of parts of ETL¶
In some cases, you may have a luigi.Task you want to re-run, but does
not have a force
parameter. In such cases, you should look at its
output
method and delete whatever files or database tables it created.
Utility classes will put their file byproducts in the tmp
folder, inside
a folder named after the module name. They will put database byproducts into
a schema that is named after the module name, too.
Update the ETL & metadata through version
¶
When you make changes and improvements, you can increment the version
method of tasks.base_tasks.TableTask, tasks.base_tasks.ColumnsTask and
tasks.base_tasks.TagsTask to force the task to run again.
Convenience tasks¶
There are a number of tasks and functions useful for basic, repetitive operations like interacting with or uploading tables to CARTO.
Makefile¶
The Makefile makes it easier to run tasks.
Run any task¶
Any task can be run with:
make -- run path.to.module ClassName --param-name-1 value1 --param-name-2 value2
For example:
make -- run us.bls QCEW --year 2014 --qtr 4
Other tasks¶
make dump
: RunsDumpS3
make restore <path/to/dump>
: Restore database from adump
.make sync-meta
: RunsSyncMetadata
make sync-data
: RunsSyncAllData
make sh
: Drop into an interactive shell in the Docker containermake psql
: Drop into an interactive psql session in the databasemake kill
: Kill all Docker processesmake docs
: Regenerate all documentationmake catalog
: Regenerate the HTML catalog
Metadata model¶
Our metadata is contained in six highly related tables, which are defined using SQLAlchemy classes.
Relational Diagram¶
Name | Description |
---|---|
obs_column | OBSColumn |
obs_column_table | OBSColumnTable |
obs_column_to_column | OBSColumnToColumn |
obs_column_tag | OBSColumnTag |
obs_table | OBSTable |
obs_tag | OBSTag |
Manually generated entities¶
-
class
tasks.meta.
OBSColumn
(**kwargs)¶ Describes the characteristics of data in a column, which can exist in multiple physical tables.
These should only be instantiated to generate the return value of
ColumnsTask.columns()
. Any other usage could have unexpected consequences.-
id
¶ The unique identifier for this column. Should be qualified by the module name of the class that created it. Is automatically generated by
ColumnsTask
if left unspecified, and automatically qualified by module name either way.
-
type
¶ The type of this column – for example,
Text
,Numeric
,Geometry
, etc. This is used to generate a schema for any table using this column.
-
name
¶ The human-readable name of this column. This is used in the catalog and API.
-
description
¶ The human-readable description of this column. THis is used in the catalog.
-
weight
¶ A numeric weight for this column. Higher weights are favored in certain rankings done by the API. Defaults to zero, which hides the column from catalog.
-
aggregate
¶ How this column can be aggregated. For example, populations can be summed, so a population column should be
sum
. Should be left blank if it is not to aggregate.
-
tables
¶ Iterable of all linked :class:`~.meta.OBSColumnTable`s, which could be traversed to find all tables with this column.
Iterable of all linked :class:`~.meta.OBSColumnTag`s, which could be traversed to find all tags applying to this column.
-
targets
¶ Dict of all related columns. Format is
<OBSColumn>: <reltype>
.
-
version
¶ A version control number, used to determine whether the column and its metadata should be updated.
-
extra
¶ Arbitrary additional information about this column stored as JSON.
-
catalog_lonlat
()¶ Return tuple (longitude, latitude) for the catalog for this measurement.
-
geom_timespans
()¶ Return a dict of geom columns and timespans that this measure is available for.
-
has_catalog_image
()¶ Returns True if this column has a pre-generated image for the catalog.
-
has_children
()¶ Returns True if this column has children, False otherwise.
-
has_denominators
()¶ Returns True if this column has no denominator, False otherwise.
-
is_cartographic
()¶ Returns True if this column is a geometry that can be used for cartography.
-
is_geomref
()¶ Returns True if the column is a geomref, else Null
-
is_interpolation
()¶ Returns True if this column is a geometry that can be used for interpolation.
Return license tags.
Return source tags.
-
summable
()¶ Returns True if we can sum this column and calculate for arbitrary areas.
-
-
class
tasks.meta.
OBSTag
(**kwargs)¶ Tags permit arbitrary groupings of columns.
They should only be created as part of a
tags()
implementation.-
id
¶ The unique identifier for this table. Is always qualified by the module name of the
TagsTask
that created it, and should never be specified manually.
-
name
¶ The name of this tag. This is exposed in the API and user interfaces.
-
type
¶ The type of this tag. This is used to provide more information about what the tag means. Examples are
section
,subsection
,license
,unit
, although any arbitrary type can be defined.
-
description
¶ Description of this tag. This may be exposed in the API and catalog.
-
version
¶ A version control number, used to determine whether the tag and its metadata should be updated.
-
Autogenerated entities¶
-
class
tasks.meta.
OBSColumnToColumn
(**kwargs)¶ Relates one column to another. For example, a
Text
column may contain identifiers that are unique to geometries in anotherGeometry
column. In that case, anOBSColumnToColumn
object ofreltype
GEOM_REF
should indicate the relationship, and make relational joins possible between tables that have both columns with those that only have one.These should never be created manually. Their creation should be handled automatically from specifying
OBSColumn.targets
.These are unique on
(source_id, target_id)
.-
reltype
¶ required text specifying the relation type. Examples are
GEOM_REF
and ~.meta.DENOMINATOR.
-
-
class
tasks.meta.
OBSColumnTable
(**kwargs)¶ Glues together
OBSColumn
andOBSTable
. If this object exists, the related column should exist in the related table, and can be selected withcolname
.Unique along both
(column_id, table_id)
and(table_id, colname)
.These should never be created manually. Their creation and removal is handled automatically as part of
targets.TableTarget.update_or_create_metadata()
.-
colname
¶ Column name for selecting this column in a table.
-
extra
¶ Extra JSON information about the data. This could include statistics like max, min, average, etc.
-
-
class
tasks.meta.
OBSTable
(**kwargs)¶ Describes a physical table in our database.
These should never be instantiated manually. They are automatically created by
output()
. The unique key is :attr:~.meta.OBSTable.id:.-
id
¶ The unique identifier for this table. Is always qualified by the module name of the class that created it.
-
columns
¶ An iterable of all the
OBSColumnTable
instances contained in this table.
-
tablename
¶ The automatically generated name of this table, which can be used directly in select statements. Of the format
obs_<hash>
.
-
timespan
¶ An OBSTimespan instance containing information about the timespan this table applies to. Obtained from
timespan()
.
-
the_geom
¶ A simple geometry approximating the boundaries of the data in this table.
-
description
¶ A description of the table. Not used.
-
version
¶ A version control number, used to determine whether the table and its metadata should be updated.
-
geom_column
()¶ Return the column geometry column for this table, if it has one.
Returns None if there is none.
-
geomref_column
()¶ Return the geomref column for this table, if it has one.
Returns None if there is none.
-
-
class
tasks.meta.
OBSColumnTag
(**kwargs)¶ Glues together
OBSColumn
andOBSTag
. If this object exists, the related column should be tagged with the related tag.Unique along
(column_id, tag_id)
.These should never be created manually. Their creation and removal is handled automatically as part of
targets.TableTarget.update_or_create_metadata()
.
Validating your code¶
- Best practices
- Proper use of utility classes
- Clearly documented command that runs a WrapperTask to create everything
- Use parameters only when necessary
- Use default parameter values sparingly
- Keep fewer than 1000 columns per table
- Each geometry column should have a unique
geom_ref
column with it - Specify section, subsection, source tags and license tags for all columns
- Specify unit tags for all measure columns
- Making sure ETL code works right
- Making sure metadata works right
- Regenerate and look at the Catalog
- Upload to a test CARTO server
Best practices¶
Writing ETL code is meant to be open-ended, but there are some standards that should be followed to help keep the code clean and clear.
Proper use of utility classes¶
There are extensive abstract-classes available for development. These can do things like download and unzip a file to disk (tasks.base_tasks.DownloadUnzipTask) and import a CSV on disk to a temporary table (tasks.base_tasks.CSV2TempTableTask). These classes should be used when possible to minimize specialized ETL code. In particular, these tasks save output to well-known locations so as to avoid redundantly running the same tasks.
Clearly documented command that runs a WrapperTask to create everything¶
Oftentimes an ETL will have to loop over a parameter to get all the data – for example, if a dataset is available online year-by-year, it may make sense to write a single task that downloads one year’s file, with an parameter specifying which year.
luigi.WrapperTask is a way to make sure such tasks are executed for every relevant parameter programmatically. A powerful example of this can be found with tasks.us.AllZillow, which executes a tasks.us.zillow.Zillow task once for each geography level, year, and month in that year.
A generic example of using a luigi.WrapperTask:
from luigi import WrapperTask, Task, Parameter
class MyTask(Task):
'''
This task needs to be run for each possible `geog`
'''
geog = Parameter()
def run(self):
pass
def output(self):
pass
class MyWrapperTask(WrapperTask):
'''
Execute `MyTask` once for each possible `geog`.
'''
def requires(self):
for geog in ('state', 'county', 'city'):
yield MyTask(geog=geog)
Use parameters only when necessary¶
Tasks are unique to their parameters. In other words, if a task is run once with a certain set of parameters, it will not be run again unless the output it generated is deleted.
Therefore it’s very important to not have parameters available in a Task’s definition that do not affect its result. If you have such extraneous parameters, it would be possible to run a task redundantly.
An example of this:
from tasks.base_tasks import DownloadUnzipTask
class MyBadTask(DownloadUnzipTask):
goodparam = Parameter()
badparam = Parameter()
def url(self):
return 'http://somesite/with/data/{}'.format(self.goodparam)
tasks.base_tasks.DownloadUnzipTask will generate the location for a unique
output file automatically based off of all its params, but badparam
above
doesn’t actually affect the file being downloaded. That means if we change
badparam
we’ll download the same file twice.
Use default parameter values sparingly¶
The above bad practice is easily paired with setting default values for parameters. For example:
from tasks.base_tasks import DownloadUnzipTask
class MyBadTask(DownloadUnzipTask):
'''
My URL doesn't depend on `badparam`!
'''
goodparam = Parameter()
badparam = Parameter(default='foo')
def url(self):
return 'http://somesite/with/data/{}'.format(self.goodparam)
Now it’s easy to simply forget that badparam
even exists! But it still
affects the output filename, making it noisy and less clear which parameters
actually matter.
Keep fewer than 1000 columns per table¶
Postgres has a hard limit on the number of columns. If you create
a tasks.base_tasks.TableTask whose columns
method returns
a OrderedDict with much more than 1000 columns, the task will fail.
In such cases, you’ll want to split your tasks.base_tasks.TableTask into several pieces, likely pulling columns from the same tasks.base_tasks.ColumnsTask. There is no limit on the number of columns in a tasks.base_tasks.ColumnsTask.
Each geometry column should have a unique geom_ref
column with it¶
When setting up a tasks.base_tasks.ColumnsTask for Geometries, make sure that
you store a meaningful and unique geom_ref
from the same table.
- It is meaningful if it can be found as a way to refer to that geometry in
data sources elsewhere – for example, FIPS codes are meaningful references
to county geometries in the USA. However, the automatically generated serial
ogc_fid
column from a Shapefile is not meaningful. - It is unique if that
geom_ref
column has an ID that is not duplicated by any other columns.
For example:
from tasks.base_tasks import ColumnsTask
from tasks.meta import OBSColumn, GEOM_REF
from luigi import Parameter
class MyGeoColumnsTask(ColumnsTask):
resolution = Parameter()
def columns(self):
geom = OBSColumn(
id=self.resolution,
type='Geometry')
geomref = OBSColumn(
id=self.resolution + '_id', # Make sure we have "+ '_id'"!
type='Text',
targets={geom: GEOM_REF})
return OrderedDict([
('geom', geom),
('geomref', geomref)
])
No matter what resolution
this Task is passed, it will generate a unique ID
for both the geom
and the geomref
. If the + '+id'
concatenation
were missing, it would mean that the metadata model would not properly link
geomrefs to the geometries they refer to.
Specify section, subsection, source tags and license tags for all columns¶
When defining your tasks.meta.OBSColumn objects in
a tasks.base_tasks.ColumnsTask class, make sure each column is assigned
a tasks.meta.OBSTag of type
, section
, subsection
, source
,
and license
. Use shared tags from tasks.tags when possible, in
particular for section
and subsection
.
Specify unit tags for all measure columns¶
When defining a tasks.meta.OBSColumn that will hold a measurement, make
sure to define a unit
using a tag. This could be something like
people
, money
, etc. There are standard units accessible in
tasks.tags.
Making sure ETL code works right¶
After having written an ETL, you’ll want to double check all of the following to make sure the code is usable.
Results and byproducts are being generated¶
When you use Run any task to run individual components:
- Were any exceptions thrown? On what task were they thrown? With which arguments?
- Are appropriate files being generated in the
tmp
folder? - Are tables being created in the relevant
tmp
schema? - Are tables and columns being added to the
observatory.obs_table
andobservatory.obs_column
metadata tables?
Provided tasks.base_tasks.TableTask and tasks.base_tasks.ColumnTask classes were executed, it’s wise to jump into the database and check to make sure entries were made in those tables.
make psql
SELECT COUNT(*) FROM observatory.obs_column WHERE id LIKE 'path.to.module.%';
SELECT COUNT(*) FROM observatory.obs_table WHERE id LIKE 'path.to.module.%';
SELECT COUNT(*) FROM observatory.obs_column_table
WHERE column_id LIKE 'path.to.module%'
AND table_id LIKE 'path.to.module%';
Delete old data to start from scratch to make sure everything works¶
When using the proper utility classes, your data on disk, for example from
downloads that are part of the ETL, will be saved to a file or folder
tmp/module.name/ClassName_Args
.
In order to make sure the ETL is reproduceable, it’s wise to delete this folder or move it to another location after development, and re-run to make sure that the whole process can still run from start to finish.
Making sure metadata works right¶
Checking the metadata works right is one of the more challenging components of QA’ing new ETL code.
Regenerate the obs_meta
table¶
The obs_meta
table is a denormalized view of the underlying metadata
objects that you’ve created when running tasks.
You can force the regeneration of this table using tasks.carto.OBSMetaToLocal
make -- run carto OBSMetaToLocal --force
Once the table is generated, you can take a look at it in SQL:
make psql
If the metadata is working correctly, you should have more entries in
obs_meta
than before. If you were starting from nothing, there should be
more than 0 rows in the table.
SELECT COUNT(*) FROM observatory.obs_meta;
If you already had data, you can filter obs_meta
to look for new rows with
a schema corresponding to what you added. For example, if you added metadata
columns and tables in tasks/mx/inegi
, you should look for columns with that
schema:
SELECT COUNT(*) FROM observatory.obs_meta WHERE numer_id LIKE 'mx.inegi.%';
If nothing is appearing in obs_meta
, chances are you are missing some
metadata:
Have you defined and executed a proper tasks.base_tasks.TableTask?¶
You can check to see if these links exist by checking obs_column_table
:
make psql
SELECT COUNT(*) FROM observatory.obs_column_table
WHERE column_id LIKE 'my.schema.%'
AND table_id LIKE 'my.schema.%';
If they don’t exist, make sure that your Python code roughly corresponds to:
from tasks.base_tasks import ColumnsTask, TableTask
class MyColumnsTask(ColumnsTask):
def columns(self):
# Return OrderdDict of columns here
class MyTableTask(TableTask):
def table_timespan(self):
# Return timespan here
def requires(self):
return {
'columns': MyColumnsTask()
}
def columns(self):
return self.input()['columns']
def populate(self):
# Populate the output table here
Unless the TableTask returns some of the columns from ColumnsTask
in its own columns
method, the links will not be initialized properly.
Finally, double check that you actually ran the TableTask using make
-- run my.schema MyTableTask
.
Are you defining geom_ref
relationships properly?¶
In cases where a TableTask does not have its own geometries, at least
one of the columns returned from its columns
method needs to be in
a geom_ref
relationship. Here’s an example:
from collections import OrderedDict
from tasks.base_tasks import ColumnsTask, TableTask
from tasks.meta import OBSColumn, GEOM_REF
class MyGeoColumnsTask(ColumnsTask):
def columns(self):
geom = OBSColumn(
type='Geometry')
geomref = OBSColumn(
type='Text',
targets={geom: GEOM_REF})
return OrderedDict([
('geom', geom),
('geomref', geomref)
])
class MyColumnsTask(ColumnsTask):
def columns(self):
# Return OrderdDict of columns here
class MyTableTask(TableTask):
def table_timespan(self):
# Return timespan here
def requires(self):
return {
'geom_columns': MyGeoColumnsTask(),
'data_columns': MyColumnsTask()
}
def columns(self):
cols = OrderedDict()
cols['geomref'] = self.input()['geom_columns']['geomref']
cols.update(self.input()['data_columns'])
return cols
def populate(self):
# Populate the output table here
The above code would ensure that all columns existing inside MyTableTask
would be appropriately linked to any geometries that connect to geomref
.
Do you have both the data and geometries in your table?¶
You can check by running:
SELECT * FROM observatory.obs_table
WHERE id LIKE 'my.schema.%';
If there is only one table and it has a null “the_geom” boundary, then you are missing a geometry table. For example:
SELECT * from observatory.obs_table
WHERE id LIKE 'es.ine.five_year_population%';
id | tablename | timespan | the_geom | description | version
----------------------------------------+----------------------------------------------+----------+----------+-------------+---------
es.ine.five_year_population_99914b932b | obs_24b656e9e23d1dac2c8ab5786a388f9bf0f4e5ae | 2015 | | | 5
(1 row)
Notice that the_geom is empty. You will need to write a second TableTask with the following structure:
class Geometry(TableTask):
def table_timespan(self):
# Return timespan here
def requires(self):
return {
'meta': MyGeoColumnsTask(),
'data': RawGeometry()
}
def columns(self):
return self.input()['meta']
def populate(self):
# Populate the output table here
Regenerate and look at the Catalog¶
Once tasks.carto.OBSMetaToLocal has been run, you can generate the catalog.
make catalog
You can view the generated Catalog in a browser window by going to the IP and
port address for the nginx process. The current processes are shown with
docker-compose ps
or make ps
.
- Are there any nasty typos or missing data?
- Variable names should be unique, human-readable, and concise. If the variable needs more in-depth definition, this should go in the “description” of the variable.
- Does the nesting look right? Are there columns not nested?
- Variables that are denominators should also have subcolumns of direct nested variables.
- There may be repetitive nesting if a variable is nested under two denominators, which is fine.
- Are sources and licenses populated for all measures?
- A source and license tasks.util.OBSTag must be written for new sources and licenses
- Is a table with a boundary/timespan matrix appearing beneath each measure?
- If not, hardcode the sample latitude and longitude in tasks.meta.catalog_lonlat.
Upload to a test CARTO server¶
If you set a CARTODB_API_KEY
and CARTODB_URL
in your .env
file, in
the format:
CARTODB_API_KEY=your_api_key
CARTODB_URL=https://username.carto.com
You will now be able to upload your data and metadata to CARTO for previewing.
make sync
Testing your data¶
ETL unit tests¶
Unit tests ensure that there are no errors in the underlying utility classes that could cause errors in code you build on top of them.
Tests are run with:
make etl-unittest
Metadata integration tests¶
Integration tests make sure that the metadata being generated as part of your ETL will actually be queryable by the API. For example, if you have an ETL that ingests data but does not ..
API unit tests¶
API unit tests make sure the observatory-extension, which reads data and metadata from the ETL, are working right.
In order for this to function, you’ll need to clone a copy of
observatory-extension
into the root of the bigmetadata
repo.
git clone git@github.com:cartodb/observatory-extension
make extension-unittest
Integration tests¶
Integration tests ensure that the data from the ETL that is set for deployment is is able to return a measure for every piece of metadata.
As above, you’ll need a copy of observatory-extension
locally for this test
to work.
git clone git@github.com:cartodb/observatory-extension
make extension-autotest