Introduction
Notebooks are not really good for writing software!
Notebooks are primarily a tool from the fields of Analytics, Data Science, and Machine Learning. The primary focus in these areas is on data:
- Data should be understood
- Models should be trained
- Data and models should be evaluated
Additionally, there is a high rate of career changers in these exact fields.
Notebooks are precisely the right thing when it comes to directly addressing a use case, presenting results, or possibly granting access to stakeholders, without having to deal with techniques like modularization, testing, web servers, etc.
The demands in software development are entirely different. At this point, I want to put just one argument on the table that directly illustrates why notebooks do not work for real software development (not even in Databricks):
Good software development is testable: through unit tests, integration tests, as well as end-to-end tests! Notebooks only allow for end-to-end tests.
Here is a simple example:
# Databricks notebook source
from pyspark.sql import DataFrame
# COMMAND ----------
def read_table(catalog: str, schema: str, table: str) -> DataFrame:
return spark.table(f"{catalog}.{schema}.{table}")
# COMMAND ----------
foo_df = read_table("some_catalog", "some_schema", "foo")
# COMMAND ----------
display(foo_df)
This is a “simple”, non-modularized Databricks Notebook, which is only supposed to load a table as a DataFrame and display it.
The notebook can (if at all) be tested end-to-end. It is not easily possible
to test the read_table method in isolation, because importing it would always
execute the entire notebook (with from notebook import read_table, all imports,
declarations, initializations, expressions, and statements are executed).
Now, one can go ahead and modularize the method:
# src/util.py
from pyspark.sql import DataFrame, SparkSession
def read_table(
catalog: str,
schema: str,
table: str,
*,
spark: SparkSession,
) -> DataFrame:
return spark.table(f"{catalog}.{schema}.{table}")
And the adapted notebook:
# Databricks notebook source
# MAGIC %load_ext autoreload
# MAGIC %autoreload 2
# COMMAND ----------
from src.util import read_table
# COMMAND ----------
foo_df = read_table("some_catalog", "some_schema", "foo", spark=spark)
# COMMAND ----------
display(foo_df)
The code is now modularized, and modules are testable: read_table can be unit
tested here.
Nevertheless, the workflow for software development is not optimal. It’s a dilemma:
- The entry point into code on Databricks is always a notebook. However, code is actually written in an IDE.
- The Databricks web interface does not allow code to be debugged (e.g., setting breakpoints).
Databricks tries to counteract this dilemma somewhat and provides, especially for VS Code, superbly integrated Databricks Extensions: Code can be programmed and debugged locally, but the code itself is executed remotely on Databricks. “Problems” with this approach:
- Offline coding is hardly possible, or only with difficulty.
- Computes incur costs depending on their dimensioning.
In a way, one is looking for a silver bullet. In the following, I would like to present Test-Driven Development as precisely this.
Test Driven Development
In strict Test-Driven Development, tests are written before the code is written (also known as the “Red-Green-Refactor” cycle). Initially, a test fails. With the correct implementation of the code, the tests will then pass successfully.
However, I don’t believe in the principle of “strict” TDD. In practice, I have never experienced it being truly followed through. There are concepts where one person writes the tests and another person writes the features for these tests. But I have never experienced that in practice either.
In less strict Test-Driven Development, testing is seen as an elementary component of software development, where larger features are only considered complete once they have been (at least partially) tested.
I subscribe to the principle of “less strict” TDD 100% and often practice it this way. And it is precisely this approach that I would like to recommend to you in the following for a good workflow on Databricks.
TDD On Databricks Level 1
Now, we’ve taken a broad run-up just to bring the following point home: There are certain hurdles in Databricks that can be circumvented by writing tests. At the same time, writing tests is also an elementary part of the development process in many environments. So, here’s a “simple” introduction to TDD with Databricks-specific or Spark code.
Project Structure
- /notebooks/some_notebook
- /src/__init__.py
- /src/util.py
- /tests/test_util.py
- /tests/fixtures/
- .gitignore
Providing Data as Fixtures
At the beginning of a project, a less sensitive dataset can be downloaded as a Parquet file and made available to the project.
Suppose there is a less sensitive table some_catalog.some_schema.some_tablewith
a few hundred or thousand rows. This can either be downloaded directly from
Databricks as a .csv file, or indirectly as a .parquet file (e.g., by
downloading it from Azure Blob Storage).
The table file is then placed in the following path:
/tests/fixtures/unity_catalog/some_catalog/some_schema/some_table.parquet
And the .gitignore can be supplemented with one line:
/tests/fixtures/unity_catalog/*
The reason for this line in the .gitignore is that tables can become
arbitrarily large, and we don’t necessarily want to commit 100MB-sized tables
into our projects.
The disadvantage of this, of course, is that CICD pipelines for automated tests become more complex, and the general project setup is more complicated.
Everything has its pros and cons… at this point, I don’t want to express an explicit preference, but just point it out.
Create util.py file
Next, either the test can be written (strict TDD), or a small part of the code. I find it nicer at this stage to write some code first:
from pyspark.sql import DataFrame, SparkSession
def read_table(
fully_qualified_table_name: str,
*,
spark: SparkSession,
) -> DataFrame:
return spark.table(fully_qualified_table_name)
Create test_util.py
At this point, it can be useful to write the first test. Not only to test this not-very-complex function, but also to have an initial draft for a test suite that can be expanded:
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
from pathlib import Path
from src.util import read_table
UNITTEST_ROOT = Path(__file__).parent
FIXTURES_ROOT = UNITTEST_ROOT / "fixtures"
FIXTURES_UC_ROOT = FIXTURES_ROOT / "unity_catalog"
class TestReadTable:
def setup_method(self):
self.spark = SparkSession.builder.getOrCreate()
table_path = (
FIXTURES_UC_ROOT
/ "some_catalog"
/ "some_schema"
/ "some_table.parquet"
)
fully_qualified_table_name = "some_catalog__some_schema__some_table"
self.table_df = (
self.spark.read.format("parquet")
.load(table_path.as_posix())
)
self.table_df.createOrReplaceTempView(fully_qualified_table_name)
def test_read_table(self):
df = read_table(
fully_qualified_table_name="some_catalog__some_schema__some_table",
spark=self.spark,
)
assert_df_equality(df, self.table_df)
setup_method is executed individually before each test function. Alternatively,
there is also setup_class - which is executed only once per class. Especially
when working with data, unpleasant, unwanted dependencies between tests occur
repeatedly (imagine if, besides the read method, there were also countand
delete methods to test. Here, it can make a difference whether thecount method
is tested first and then the delete method, if both operate on the same
temporary table).
In setup_method, a Spark session is opened first. This could also be well
placed in a setup_class method. I’m keeping it simple here.
Subsequently, the reference table is loaded from the local file system and
stored as a temporary table within the Spark session. This table only lives
in memory, and it only lives for the duration of the session. It does not
live in a catalog or a schema. Apart from that, however, it can be read using
common Spark methods. These include
spark.sql("SELECT * FROM some_catalog__some_schema__some_table") or
analogously spark.table("some_catalog__some_schema__some_table").
The test_read_table method then logically tests the read method.
Summary
At this point, I would like to note: I’m not concerned with the meaningfulness or quality of the test. I’m concerned with a different point: I can debug! (and all this without cloud compute or a Databricks Extension) Except from an IDE where a breakpoint is set, and real, actual Spark code is being debugged.
In doing so, I can also jump into my modules and debug components specifically, without working with debug logs or prints.
If you have read this article astutely up to this point, you might have noticed a small “cheat”:
The original from the introduction:
def read_table(
catalog: str,
schema: str,
table: str,
*,
spark: SparkSession,
) -> DataFrame:
return spark.table(f"{catalog}.{schema}.{table}")
The simplification for the test:
def read_table(
fully_qualified_table_name: str,
*,
spark: SparkSession,
) -> DataFrame:
return spark.table(fully_qualified_table_name)
The code was simplified for the sake of the test. createOrReplaceTempViewonly
offers one table level and no catalog and schema level. However, the
original read_table method specifically requires catalog and schema!
Please note the following at this point: Good code is always testable. However, one should never have to build in workarounds for the sake of tests that are not needed / not intended in production.
This will be smoothed out in Concept 2.
TDD on Databricks Level 2
Steps 1 and 2 are identical to the first concept. There is a small difference in the third step.
util.py File
Now, the original version of the read_table method should be used, which
requires a catalog and schema:
from pyspark.sql import DataFrame, SparkSession
def read_table(
catalog: str,
schema: str,
table: str,
*,
spark: SparkSession,
) -> DataFrame:
return spark.table(f"{catalog}.{schema}.{table}")
Rewrite test_util.py File
The entry point for writing a test is now even better than before! Here, the cornerstone for meaningful mocking can be laid:
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
from pathlib import Path
from src.util import read_table
UNITTEST_ROOT = Path(__file__).parent
FIXTURES_ROOT = UNITTEST_ROOT / "fixtures"
FIXTURES_UC_ROOT = FIXTURES_ROOT / "unity_catalog"
class MissingMockException(Exception):
pass
class SparkSessionTestSuite(SparkSession):
class Builder(SparkSession.Builder):
def getOrCreate(self):
spark = super().getOrCreate()
return SparkSessionTestSuite(
sparkContext=spark.sparkContext,
)
def _get__some_catalog__some_schema__some_table(self):
path = FIXTURES_UC_ROOT / "some_catalog" / "some_schema" / "some_table.parquet"
return self.read.format("parquet").load(path.as_posix())
def table(self, tableName):
if tableName == "some_catalog.some_schema.some_table":
return self._get__some_catalog__some_schema__some_table()
raise MissingMockException(
f"Table {tableName} not found in mocked Spark session"
)
class TestReadTable:
def setup_class(self):
self.spark = SparkSessionTestSuite.builder.getOrCreate()
def test_read_table(self):
df = read_table(
catalog="some_catalog",
schema="some_schema",
table="some_table",
spark=self.spark,
)
expected_df = self.spark._get__some_catalog__some_schema__some_table()
assert_df_equality(df, expected_df)
Compared to the previous test, there are some adjustments:
- The setup method has been greatly simplified: Only a Spark session is needed
– and a special one for the test case at that! The
DataFrameis no longer loaded into the session directly in the setup. A note on transparency: The test is currently written in such a way that the entireDataFramehas to be read twice for one test. This doesn’t hurt with small DataFrames, but for larger ones, this can still be optimized. - The class
SparkSessionTestSuiteinherits from the normalSparkSession. All methods of this class are retained, except those that we override. - table specification: This method is overridden specifically for the test (and
potentially for others). This way, a single function no longer needs to be
adapted to the test case; instead, the test environment is adapted to the
conditions. In the current implementation, there’s even untapped potential:
the table method can be overridden to explicitly ensure it always accepts a
fully_qualified_table_name(e.g.,assert len(tableName.split(".")) == 3). But this, too, requires an individual decision.
There are certain things that only work on Databricks or in the cloud. Reading from Unity Catalog is one possibility, creating tables as external tables is another. Here, the Spark test suite may need to be continuously extended, and possibly other classes adapted as well. However, this goes beyond the scope of this introduction.
At this point, however, it is “good enough.” The goal of this guide is not to explain good tests, but to use tests as a tool for a practicable way of working with Databricks or Spark. When in doubt, not everything needs to be tested down to the last detail. Sometimes, a test is also simply an entry point for the debugger, helping you to step through transformations one by one, up to – and possibly even excluding – a write process.
TDD on Databricks Level 3
Okay… I’m in the flow right now, and we’re very close to an optimal solution. So, I’m going to wrap this part up here! What if we want to save tables and test exactly that, and be able to debug it completely?
Let me be honest at this point: this is where things get a bit wild. And you really need to have the desire and the time for it. I’ve written many tests in my projects over the last four years and am used to bending my environment into shape for tests. It works very well for me! But finding the right methods to override is sometimes a real art. Here, I work a lot by feel and make many mistakes & inaccuracies myself. Feel free to consider this section as “nice to know.” Don’t feel obligated to follow along here, especially if you end up spending more time getting your test suite running than implementing the actual feature!
In principle, this concept is just an addition to Concept 2. So, it connects directly to it.
Extend util.py File
A table is now to be written in a helper function. The code for this follows directly here:
from pyspark.sql import DataFrame, SparkSession
def read_table(
catalog: str,
schema: str,
table: str,
*,
spark: SparkSession,
) -> DataFrame:
return spark.table(f"{catalog}.{schema}.{table}")
def save_df_as_table(
catalog: str,
schema: str,
table: str,
df: DataFrame,
*,
spark: SparkSession,
):
(
df.write.format("delta").mode("overwrite")
.saveAsTable(f"{catalog}.{schema}.{table}")
)
Extend the Test File
A table is now to be written in a helper function. The code for this follows directly here:
import pytest
from py4j.java_gateway import JavaObject
from pyspark.sql import SparkSession, DataFrame, DataFrameWriter, DataFrameReader
from chispa.dataframe_comparer import assert_df_equality
from pathlib import Path
from src.util import read_table, save_df_as_table
UNITTEST_ROOT = Path(__file__).parent
FIXTURES_ROOT = UNITTEST_ROOT / "fixtures"
FIXTURES_UC_ROOT = FIXTURES_ROOT / "unity_catalog"
class MissingMockException(Exception):
pass
class DataFrameWriterTestSuite(DataFrameWriter):
def saveAsTable(self, *args, **kwargs):
table_name = args[0].replace(".", "__")
self._df.createOrReplaceTempView(table_name)
class DataFrameTestSuite(DataFrame):
@property
def write(self):
return DataFrameWriterTestSuite(self)
class DataFrameReaderTestSuite(DataFrameReader):
def _df(self, jdf: JavaObject) -> DataFrame:
return DataFrameTestSuite(jdf, self._spark)
class SparkSessionTestSuite(SparkSession):
class Builder(SparkSession.Builder):
def getOrCreate(self):
spark = super().getOrCreate()
return SparkSessionTestSuite(
sparkContext=spark.sparkContext,
)
def _get__some_catalog__some_schema__some_table(self):
path = FIXTURES_UC_ROOT / "some_catalog" / "some_schema" / "some_table.parquet"
return self.read.format("parquet").load(path.as_posix())
def table(self, tableName):
if tableName == "some_catalog.some_schema.some_table":
return self._get__some_catalog__some_schema__some_table()
try:
return super().table(tableName)
except Exception: # Catching broad exception, specific Spark AnalysisException is better
raise MissingMockException(
f"Table {tableName} not found in mocked Spark session or as temp view"
)
@property
def read(self) -> DataFrameReaderTestSuite:
return DataFrameReaderTestSuite(self)
class TestUtilFunctions:
def setup_method(self):
self.spark = SparkSessionTestSuite.builder.getOrCreate()
def test_read_table(self):
df = read_table(
catalog="some_catalog",
schema="some_schema",
table="some_table",
spark=self.spark,
)
expected_df = self.spark._get__some_catalog__some_schema__some_table()
assert_df_equality(df, expected_df)
def test_save_df_as_table(self):
df_to_save = self.spark._get__some_catalog__some_schema__some_table()
# Verify that table does not exist yet as a temp view with the "mangled" name.
# Please note: prod will use some_catalog.some_schema.some_table
# The DataFrameWriterTestSuite mock changes this to some_catalog__some_schema__some_table for the temp view.
with pytest.raises(Exception):
self.spark.table("some_catalog__some_schema__some_table") # Check if temp view exists
save_df_as_table(
catalog="some_catalog",
schema="some_schema",
table="some_table",
df=df_to_save,
spark=self.spark,
)
# Now check if the temp view was created by the mocked saveAsTable
expected_saved_df = self.spark.sql("SELECT * FROM some_catalog__some_schema__some_table")
assert_df_equality(df_to_save, expected_saved_df)
- Some classes have been adapted in this example. However, the test case
test_read_tablehas not. And that’s a good thing! It’s always a bad sign if you have to adapt existing tests for the sake of new tests. It’s not always avoidable, but I explicitly made sure here that I didn’t have to do it. - Targeted overriding of the read property in
SparkSessionTestSuite. Originally,spark.readreturns a standardDataFrameReader. However, aDataFrameis to be saved as a table – for this,theDataFrame.write.saveAsTablemethod is used. ThechainDataFrame.writeand thenDataFrameWriter.saveAsTableneeds to be specifically overridden. - Creation of a
DataFrameReaderTestSuiteclass. Here, themethod_df()is overridden. The rationale for overriding this method is a mixture of intuition and debugging. In principle, I looked at what happens with theload()method hereself.read.format("parquet").load(path.as_posix())(I jumped into the method with the IDE and saw that aself._df()is always returned there). - Creation of a
DataFrameTestSuiteto control the write property.DataFrameTestSuite.writereturns aDataFrameWriterTestSuiteclass. This is where thesaveAsTablemethod lives, which needs to be specifically overridden. - Creation of a
DataFrameWriterTestSuitethat specifically overrides thesaveAsTablemethod. It now ensures that the table is written neither to the Hive Metastore nor to the Unity Catalog, but is stored as a temporary view. test_save_df_as_table: The goal of this test is to test the new methodutil.save_df_as_table. To do this, it first ensures that the tablesome_catalog__some_schema__some_tabledoes not yet exist in the Spark session (that’s where it’s expected for the test case by the mock). Then, the table is persisted using the helper method. And this is also successfully tested.
Adopting Test Infrastructure
Last section, I promise! There’s still one small problem: we’ve invested an
extreme amount of effort in testing for one module (util.py). It would be great
if this effort could also be reused for other modules. The test structure can
be easily adapted for this.
First, a conftest.py file is created. Classes and mocks are stored there and
correspondingly removed from the test_util.py file:
# File: tests/unittest/conftest.py
import pytest
from py4j.java_gateway import JavaObject
from pyspark.sql import SparkSession, DataFrame, DataFrameWriter, DataFrameReader
from pathlib import Path
UNITTEST_ROOT = Path(__file__).resolve().parent
FIXTURES_ROOT = UNITTEST_ROOT / "fixtures"
FIXTURES_UC_ROOT = FIXTURES_ROOT / "unity_catalog"
class MissingMockException(Exception):
pass
class DataFrameWriterTestSuite(DataFrameWriter):
def saveAsTable(self, *args, **kwargs):
table_name = args[0].replace(".", "__")
self._df.createOrReplaceTempView(table_name)
class DataFrameTestSuite(DataFrame):
@property
def write(self) -> DataFrameWriterTestSuite:
return DataFrameWriterTestSuite(self)
class DataFrameReaderTestSuite(DataFrameReader):
def _df(self, jdf: JavaObject) -> DataFrameTestSuite:
return DataFrameTestSuite(jdf, self._spark)
class SparkSessionTestSuite(SparkSession):
class Builder(SparkSession.Builder):
def getOrCreate(self):
spark = super().getOrCreate()
return SparkSessionTestSuite(
sparkContext=spark.sparkContext,
)
def _get__some_catalog__some_schema__some_table(self) -> DataFrameTestSuite:
path = FIXTURES_UC_ROOT / "some_catalog" / "some_schema" / "some_table.parquet"
# This will now use the overridden self.read that returns DataFrameReaderTestSuite
# and subsequently DataFrameTestSuite for the loaded DataFrame.
return self.read.format("parquet").load(path.as_posix())
def table(self, tableName: str) -> DataFrameTestSuite:
if tableName == "some_catalog.some_schema.some_table":
return self._get__some_catalog__some_schema__some_table()
try:
temp_view_df = super().table(tableName)
return DataFrameTestSuite(temp_view_df._jdf, self)
except Exception: # Be more specific with Spark AnalysisException if possible
raise MissingMockException(f"Table {tableName} not found in mocked Spark session or as temp view")
@property
def read(self) -> DataFrameReaderTestSuite:
return DataFrameReaderTestSuite(self)
@pytest.fixture(scope="session")
def spark() -> SparkSessionTestSuite:
spark_session = SparkSessionTestSuite.builder.getOrCreate()
yield spark_session
spark_session.stop()
Please note: A rather inconspicuous function has appeared at the end of the file:
@pytest.fixture(scope="session")
def spark():
spark_session = SparkSessionTestSuite.builder.getOrCreate()
yield spark_session
spark_session.stop()
This initializes the Spark session, which will subsequently be used for various tests.
# File: tests/unittest/test_util.py
import pytest
from chispa.dataframe_comparer import assert_df_equality
from tests.unittest.conftest import MissingMockException # Assuming conftest.py is in tests/unittest/
from src.util import read_table, save_df_as_table
class TestUtilFunctions:
def test_read_table(self, spark):
df = read_table(
catalog="some_catalog",
schema="some_schema",
table="some_table",
spark=spark,
)
expected_df = spark._get__some_catalog__some_schema__some_table()
assert_df_equality(df, expected_df)
def test_save_df_as_table(self, spark):
df_to_save = spark._get__some_catalog__some_schema__some_table()
with pytest.raises(MissingMockException):
spark.table("some_catalog__some_schema__some_table")
save_df_as_table(
catalog="some_catalog",
schema="some_schema",
table="some_table", # This name is used by the mock to create the temp view name
df=df_to_save,
spark=spark,
)
expected_saved_df = spark.sql("SELECT * FROM some_catalog__some_schema__some_table")
assert_df_equality(df_to_save, expected_saved_df)
And that’s it! As if by magic, no setup_method or setup_class is needed anymore
to initialize the Spark session. This is now passed directly into the tests.
Pytest takes care of this, by the way. The fixtures from conftest.py are
globally available and are injected into test methods.
However, there is one disadvantage in the current implementation: The test
suite has been written and specialized for util.py. If a module
transformation.py is now added, which also requires a Spark session, then
test_util.py and test_transformation.py might share the same
SparkSessionTestSuite. This can become complicated again… and here too, there
are ways to efficiently avoid this.
But now I’m really drawing a line.
Why not just Notebooks?
One of Databricks’ USPs (Unique Selling Propositions): Notebooks with Python syntax! Here is an example:
# Databricks notebook source
# MAGIC %load_ext autoreload
# MAGIC %autoreload 2
# COMMAND ----------
from src.util import read_table
# COMMAND ----------
foo_df = read_table("some_catalog", "some_schema", "foo", spark=spark)
# COMMAND ----------
foo_df.show()
This notebook can also be executed as a normal Python file in an IDE / editor without any Databricks extension. However, there are significant disadvantages here:
- Databricks itself provides a compute environment. In this compute, there are
fully initialized objects or declared functions that can be called without
additional imports (examples:
spark,dbutils,display()). Locally, workarounds are at least required for these. - Databricks allows switching between different languages within a notebook
(Python, SQL, Markdown, Shell, …). This is an elementary feature of
Databricks notebooks that cannot be replicated by a conventional IDE or
editor (unless there is a Databricks extension). An example here is
# MAGIC %autoreload 2. The IDE ignores this line. On Databricks itself, however, this line has an elementary function. - The goal of TDD (as discussed) is to provide you with DataFrames so that you
can work with DataFrames without a running
compute.spark.table()orspark.sql()will not function without access to the Hive Metastore, Unity Catalog, or Azure Blob Storage. The test aims to eliminate this problem through clever mocks.
Funfact Snowflake and Snowpark
Should you ever come into contact with Snowflake and Snowpark, you can use this guide 1:1. Snowpark is heavily inspired by Spark and is identical in many parts!
