Tuesday, September 5, 2023

How to Level Up Beyond ETLs: From Query Optimization to Code Generation

The Slack messages are coming in hard and fast. Why is the cluster slow? How is gross revenue smaller vs net revenue? When will the marketing columns get added? Why does the dashboard not match the enterprise accounts table? How long will the backfill take? You feel like quitting.

Getting technical people to solve problems faced by business people is not new. What’s new is how every part of the business now runs on data, and thus the growing need for data experts to keep things moving. The tricky part is when context goes ‘over the wall’ - the consumer finds an issue with the data but the producer is not familiar with the domain to see it’s a problem. Over time, information transfer between the two sides becomes the bottleneck and issues pile up.

In proposing a solution, Stitch Fix has the boldly-titled blog post Engineers Shouldn’t Write ETL. The idea here is the engineering team creates ‘Lego blocks’ that consumers then assemble into end-to-end data pipelines. This tends to work better when consumers are themselves technical; in any case a lot of upfront investment is needed. More and more startups get created every day with the promise to make this process effortless, but for now you’re the data engineer and you need to figure it out.

Maybe this isn’t what you signed up for. Maybe there was some bait-and-switch; you were hired to write Python but due to ‘critical initiatives’ you end up writing SQL (side note: SQL is Turing complete). Maybe the manager who hired you left and the new one feels the team should have a wider scope. Maybe the CTO left and sales/marketing/finance/ops now has the CEO’s ear.

My friends, I feel your pain. We can’t turn back time, but perhaps we can reflect on the more interesting things to work on moving forwards.

Learn internals

Tell your customers you’ll speed up their queries and they’ll love you for it. I’ve worked with the following relational databases (or if you’re pedantic, RDBMS) in production: MySQL, Hive, Vertica, Snowflake, Redshift, Postgres. The common thread? Complaints about slow queries.

Relational databases have been around for a while, and are among the most ‘battle-tested’ software around. If your query is slow, odds are it’s your query and not the database.

How do we speed up our queries?

Databases used in data warehouses tend to be columnar-store; in this section we’ll focus on Redshift. The simplest way to make your queries run faster is to select only the columns you need. Let’s say you have a table where each row represents an event.

CREATE TABLE events (
    event_id      VARCHAR
    event_ts      TIMESTAMP
    user_id       VARCHAR
    browser_id    VARCHAR
);

Suppose you only need the first two columns. Instead of using selecting all the columns, you get a speed up for free by specifying only the columns you need.

SELECT
    *

FROM
    events
SELECT
    event_id
  , event_ts

FROM
    events

Note this doesn’t apply to Postgres or MySQL or SQLite, since these databases are row-oriented.

In diagnosing slow queries, the main culprit tends to be joins. What I wished someone told me about Redshift is to set the most common join column as the distribution key and the most common range filter column as the sort key. This is the 80/20.

Re: distribution keys, the motivating idea is fast query execution across multiple machines. The fancy term for this is massively-parallel processing (MPP). Since the data won’t fit in a single machine, the rows could either be spread out randomly, copied across all machines, or distributed using a bucketing mechanism like the hash of a specific column. The distribution key can be thought of as this hash.

Let’s look back at our table. Suppose event_id is the unique identifier for each event, but consumers mainly use the table to investigate user behavior. This means that most joins to other tables will be on user_id. Setting user_id as the distribution key means the rows will be distributed in the cluster based on the hash of user_id. If you’re joining to another table which also has a user_id distribution key, then the data is co-located. Less I/O means faster queries.

Even if only one table has user_id as the distribution key, then only the rows from one table need to be ‘fanned out’ across the cluster (instead of both). Like most operations that involve hashing, you also want to ensure that user_id has low percentage of nulls and high cardinality to ensure an even distribution.

Re: sort keys, the motivating idea is fast loading from storage across multiple machines. The sort key determines how rows are sorted in each machine. Running a query with a filter on the sort key speeds things up because loads take place across a smaller range. In our example, we may be interested only in events in a specific date range; event_ts would thus make a good candidate for the sort key. Less sorting means faster queries.

To create the table with user_id as the distribution key and event_ts as the sort key, specify the distribution and sort key details at the end.

CREATE TABLE events (
    event_id      VARCHAR
    event_ts      TIMESTAMP
    user_id       VARCHAR
    browser_id    VARCHAR
)
DISTSTYLE KEY
DISTKEY (user_id)
SORTKEY (event_ts);
;

There’s definitely more to be said on Redshift (AWS post here) or how things work ‘under the hood’ more generally: Redshift vs Postgres (fast aggregates vs fast single row access), key-value stores, compact formats for data-at-rest like Parquet or data-over-the-wire like Protocol Buffers - more suggestions in the section below. It’s my vote of confidence for blub studies.

Write code

OK queries are now faster, you’ve bought yourself some time to write code. While offering to rewrite the stack in Rust might be hard sell, setting up a framework to improve data quality is something everyone can get on board with.

📈

How do we improve data quality?

The first is by preventing bad data. Suppose event_id should never be null. To prevent this, we can introduce data quality checks to raise a runtime error if there are null values in the column. This way the rows in question are not inserted into the final table, but remain in a staging table for closer review. We’ll refer back to data quality checks at the end of this section.

The second is by preventing bad queries. Suppose we have the following rows in the events table (remaining columns omitted for clarity).

 user_id  | browser_id
----------+------------
          | brws0001
 user0001 | brws0002

Now let’s say our query looks for browser_id seen with a non-null user_id. If our query returns the pair brws0001 - user0001, then we know something is wrong. To protect ourselves against this, we can borrow a pattern we see all the time in software: unit testing.

Here’s how it works.

  1. Set up mock input table
  1. Insert rows above
  1. Run query
  1. Read from output table
  1. Assert output row has desired value

To do this you might want to set up a local Postgres instance to avoid calling Redshift; this way all tests can be run locally. Sounds like a lot of work? Good, you’ll get to write lots of code. In fact, a SQL unit testing framework is a great opportunity to try out a few novel programming ideas.

ORMs

I first came across ORMs (Object Relational Mapping) when building web apps. ORMs like SQLAlchemy let you manipulate the data model as Python classes, as opposed to having to write SQL (or rather, custom string formatting functions that output SQL strings).

In setting up Postgres for testing, we can leverage the SQLAlchemy engine and connection objects to abstract away database interactions. In other words, we can write the same code whether we’re interacting with Redshift (in production) or Postgres (in tests).

We can also leverage SQLAlchemy table and column objects. For example, we can define our events table above as follows.

class Table:
    __table__: sqlalchemy.Table


Base = declarative_base(Table)


class Events(Base):
    __tablename__ = "events"

    event_id = sqlalchemy.Column(sqlalchemy.String, primary_key=True)
    event_ts = sqlalchemy.Column(sqlalchemy.DateTime)
    user_id = sqlalchemy.Column(sqlalchemy.String)
    browser_id = sqlalchemy.Column(sqlalchemy.String)

Code generation

We can access Redshift to see what the column name and types are, and manually type them into the table schema file. That’s no fun.

We could instead run code to create more code. The sqlacodegen library lets you generate SQLAlchemy table schemas from the table in the database. This works well as a script. The table name of interest is specified as input. Running the script will load the schema from Redshift, convert it to a SQLAlchemy table and write it to a Python file.

Fixtures

We now have our mock tables, let’s take a closer look at inserting rows. Rows can be represented as a dictionary, where the keys are column names and values are the column values.

row = dict(
                event_id = "evnt0001",
                event_ts = datetime.datetime(2021, 1, 1, 0, 0, 0)
                user_id = "user0001",
                browser_id = "brws0001",   
)

When we need two rows, we can copy-and-paste boilerplate code. The problem here is we have to specify every key-value pair, which doesn’t help highlight user_id and browser_id as the columns we care about.

rows = [
    dict(
        event_id ="evnt0001",
        event_ts = datetime.datetime(2021, 1, 1, 0, 0, 0),
        user_id = "user0001",
        browser_id = "brws0001",
    ),
    dict(
        event_id = "evnt0002",
        event_ts = datetime.datetime(2021, 1, 1, 0, 0, 0),
        user_id = None,
        browser_id = "brws0002",
    ),
]

Since we have our events table above, why not use it? We can create a function that takes in a generic table schema and returns a row from the table. The init_column function here can either generate a random identifier (with a common prefix based on the column name) or a default value based on the type.

ColumnType = typing.Union[
    None, str, bool, int, float, datetime.date, datetime.datetime
]


def init_row(table_schema: typing.Type[Base]) -> typing.Dict[str, ColumnType]:
    """ """
    row: typing.Dict[str, ColumnType] = {}

    for column in table_schema.__table__.columns:
        row[column.name] = init_column(column.name)

    return row

For our desired row, we can simply override the column values.

row = init_row(Events)
row["user_id"] = "user0001"
row["browser_id"] = "brws0002"
print(row)
{
    'event_id': 'evntxF7z',
    'event_ts': datetime.datetime(2021, 1, 1, 0, 0),
    'user_id': 'user0001',
    'browser_id': 'brws0002'
}

Functional programming

It a little repetitive calling init_row multiple times when we need multiple rows. To generate multiple rows from a single function call, we can set up a row factory.

The init_row function takes in a table schema as input and outputs a row. Let’s kick this up a notch. We can define a function that takes in a table schema and outputs a custom function. The custom function takes in a dictionary with the override column values, and returns the row with the given override column values where specified and pre-set values elsewhere.

This might be easier to explain in code.

ColumnTypeDict = typing.Dict[str, ColumnType]


def init_row_factory(
    table_schema: typing.Type[Base],
) -> typing.Callable[[ColumnTypeDict], ColumnTypeDict]:
    """ """
    row_initial = init_row(table_schema)

    def closure(column_values: ColumnTypeDict) -> ColumnTypeDict:
                                """ """
        row_final = row_initial.copy()
        row_final.update(column_values)

        return row_final

    return closure

Let’s try it out.

row_factory = init_row_factory(Events)
row = row_factory(dict(user_id="user0001", browser_id="brws0002"))
print(row)
{
    'event_id': 'evntCX51',
    'event_ts': datetime.datetime(2021, 1, 1, 0, 0),
    'user_id': 'user0001',
    'browser_id': 'brws0002'
}

Meta-programming

Let’s recap. We now have a SQL unit testing framework set up to ensure queries are correctly written, and data quality checks to provide confidence that data in production tables has the desired properties.

🏆

How do we involve consumers in data quality checks?

The purpose here is two-fold. First, consumers have the best context to know when data looks ‘off’, and thus what checks to put in place. Second, making check creation self-serve helps everyone feel invested in the data quality.

Suppose we have the following check.

def expect_non_null_column(rows: List[ColumnTypeDict], column_name: str):
    """ """
    assert not None in [row[column_name] for row in rows]

We set this up for our events table to make sure event_id is not null.

expect_non_null_column(rows, "event_id")

It might not be too difficult for consumers to set this up, but let’s make this even more intuitive. We can set up a ‘parallel’ process so that checks can be defined in a config file like yaml.

expect_non_null_column:
    column_name: event_id

To prevent configs from being incorrectly set up, we can create a Python test to load the config and compare it against the Python definition. In particular, we can use the function signature (via __annotations__) to ensure that the config values have the same types as our function.

Take courses

Donald Knuth says it best.

If you find that you’re spending almost all your time on theory, start turning some attention to practical things; it will improve your theories. If you find that you’re spending almost all your time on practice, start turning some attention to theoretical things; it will improve your practice.

If you have the budget and can fit in a part-time course, I would recommend Bradfield. If you don’t have the budget but can take time off, I would recommend Recurse Center. If it’s one book I’d recommend (for RC or otherwise), it’s Designing Data-Intensive Applications by Martin Kleppmann. Chip Huyen has also made available her Data Engineering 101 notes for free.

Epilogue

Think of your job as a relationship. When things get bumpy, is breaking up the first thing that comes to mind? There’s nothing stopping you, but maybe you want to try to see if you can make it work. At the very least, think about the delayed opportunity to implement all the cool things we just talked about because you have to re-learn the stack at the new job.

The data producer vs consumer divide has its share of drama and politics (the other divide is moving from writing SQL to training machine learning models, perhaps for another post). Everyone is trying to figure it out. Give your manager the benefit of doubt. Trust your colleagues have the best intentions. Keep your head up, keep shipping. Things will get better.

Imagine this scenario a year from now. Queries are fast. Data is trustworthy. Everyone gets to chip in. That’s looks like a promotion packet.



from Hacker News https://ift.tt/cTo6Dq9

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.