This post discusses a few lessons I learned while working with external tables in Spark.

What is an External Table?

An external table is created when you define a table from files on disk. It is also called unmanaged table. Spark only manages the metadata about the external table. On the other hand, if you use saveAsTable on a DataFrame, you are creating a managed table for which spark will track all relevant information including both data and metadata. The actual data is stored to the location specified by the configuration spark.sql.warehouse.dir. The important difference between the two kinds of tables lies with the deletion behavior. When you delete a managed table, the underlying data is also deleted. But for an unmanaged table, only the metadata is deleted while the actual data is intact.

When to Use External Table?

Sometimes, there is an external data ingestion process that writes data to a cloud storage (e.g., Amazon s3 or Azure ADLS) according to some predefined folder structure, i,e, partition scheme. It may not be efficient to completely copy the data to a different location by creating a managed table while the underlying data might change. An external table is preferred in this situation and it essentially is a link to the actual data and allows partition pruning in spark SQL at the same time. Therefore, a developer does not need to load the data into a DataFrame by using specific paths but uses SQL where clause directly on the table.

Create an External Table

Suppose we have a small dataset created as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import pandas as pd

pdf_student = pd.DataFrame({
'name': ['a', 'b', 'c'],
'sid': [1,2,3],
'score': [82, 93, 77]
})

df_student = spark.createDataFrame(pdf_student)

_ = (
df_student
.coalesce(1)
.write
.format('parquet')
.partitionBy('sid')
.mode('overwrite')
.save('/path/to/data/student')
)

We can create an external table pointing to this dataset.

1
2
3
4
5
_ = spark.catalog.createTable(
  'dev.student',
  path='/path/to/data/student', 
  source='parquet',
)

If we look at the metadata of table dev.student, we can see that spark does partition discovery and specifies sid as the partition column as expected.

1
spark.sql("DESCRIBE TABLE EXTENDED dev.student;")

Even though the partition structure is created after create the external table, the actual partitions are not recovered. We can run the either of the following 2 commands to let spark recursively read the data and finds all the available partitions.

1
2
spark.sql("MSCK REPAIR TABLE dev.student;")      
spark.sql("ALTER TABLE dev.student RECOVER PARTITIONS;")

CAUTION: Both commands list all the files and subfolders under the root path specified in the createTable call. Therefore, it can take some time to run if there are a large number of partitions. In addition, it is important that there is enough driver memory since all the metadata are collected to the driver node.

Update an External Table with New Data

This is where things get tricky.

If we need to write new data to the source location of the table, there are several options.

First, there is mode='append'. This option will append the data to existing partitions and add new partition if necessary.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
pdf_student2 = pd.DataFrame({
  'name': ['b', 'd', 'e'],
  'sid': [2,4,5],
  'score': [63, 72, 100]
})
df_student2 = spark.createDataFrame(pdf_student2)

_ = (
  df_student2
  .coalesce(1)
  .write
  .format('parquet')
  .partitionBy('sid')
  .mode('append')
  .save('path/to/data/student')
)

The data in student2 is appended to the existing data.

namescoresid
a821
b632
b932
c773
d724
e1005

;

We can see that another student with sid = 2 is appended.

Second option is mode='overwrite' with the config spark.conf.get("spark.sql.sources.partitionOverwriteMode") = STATIC. This config is set by default for a spark session. This would overwrite the entire file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
_ = (
  df_student2
  .coalesce(1)
  .write
  .format('parquet')
  .partitionBy('sid')
  .mode('overwrite')
  .save('path/to/data/student')

)
namescoresid
b632
d724
e1005

;

Only data in the new df_student2 is present. The old data is completely overwritten even for partitions that are not present in df_student2.

The third option is mode='overwrite' with spark.conf.get("spark.sql.sources.partitionOverwriteMode") = DYNAMIC. This would overwrite existing partition and add new partitions.

1
2
3
4
5
6
7
8
9
_ = (
  df_student2
  .coalesce(1)
  .write
  .format('parquet')
  .partitionBy('sid')
  .mode('overwrite')
  .save('path/to/data/student')
)
namescoresid
a821
b632
c773
d724
e1005

;

We can also add new partitions explicitly. This option is better suited for new data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
pdf_student3 = pd.DataFrame({
  'name': ['x', 'y'],
  'sid': [6,7],
  'score': [88, 99]
})

df_student3 = spark.createDataFrame(pdf_student3)

_ = (
  df_student3
  .coalesce(1)
  .write
  .format('parquet')
  .partitionBy('sid')
  .mode('append')
  .save('path/to/data/student')

)
1
2
ALTER TABLE dev.student ADD PARTITION (sid='6') location 'path/to/data/student/sid=6';
ALTER TABLE dev.student ADD PARTITION (sid='7') location 'path/to/data/student/sid=7';

Another syntax with InsertInto

Similar behavior can be realized by using insertInto with a spark table with the config change

1
2
3
4
5
insertInto(tableName, overwrite=None)

Inserts the content of the DataFrame to the specified table.
It requires that the schema of the DataFrame is the same as the schema of the table.
Optionally overwriting any existing data.

CAUTION: InsertInto inserts data by column position. If 2nd column is score, it would be treated as sid similar to the difference between union vs unionByName. Therefore, the columnn ordering needs to be carefully maintained.


Comments

comments powered by Disqus