Snowflake
Installation
Enabling SQLFrame
SQLFrame can be used in two ways:
- Directly importing the
sqlframe.snowflakepackage - Using the activate function to allow for continuing to use
pyspark.sqlbut have it use SQLFrame behind the scenes.
Import
If converting a PySpark pipeline, all pyspark.sql should be replaced with sqlframe.snowflake.
In addition, many classes will have a Snowflake prefix.
For example, SnowflakeDataFrame instead of DataFrame.
# PySpark import
# from pyspark.sql import SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.dataframe import DataFrame
# SQLFrame import
from sqlframe.snowflake import SnowflakeSession
from sqlframe.snowflake import functions as F
from sqlframe.snowflake import SnowflakeDataFrame
Activate
If you would like to continue using pyspark.sql but have it use SQLFrame behind the scenes, you can use the activate function.
import os
from snowflake.connector import connect
from sqlframe import activate
conn = connect(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
)
activate("snowflake", conn=conn)
from pyspark.sql import SparkSession
SparkSession will now be a SQLFrame SnowflakeSession object and everything will be run on Snowflake directly.
See activate configuration for information on how to pass in a connection and config options.
Creating a Session
SQLFrame uses the Snowflake Python Connector to connect to Snowflake.
A SnowflakeQuerySession, which implements the PySpark Session API, can be created by passing in a snowflake.connector.connection.SnowflakeConnection object.
import os
from snowflake.connector import connect
from sqlframe.snowflake import SnowflakeSession
from sqlframe.snowflake import functions as F
connection = connect(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
)
session = SnowflakeSession(conn=connection)
=== Activate
```python
import os
from snowflake.connector import connect
from sqlframe import activate
conn = connect(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
)
activate("snowflake", conn=conn)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
```
Using Snowflake Unique Functions
Snowflake may have a function that isn't represented within the PySpark API. If that is the case, you can call it directly using PySpark call_function function.
import os
from snowflake.connector import connect
from sqlframe.snowflake import SnowflakeSession
from sqlframe.snowflake import functions as F
connection = connect(
account=os.environ["SQLFRAME_SNOWFLAKE_ACCOUNT"],
user=os.environ["SQLFRAME_SNOWFLAKE_USER"],
password=os.environ["SQLFRAME_SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SQLFRAME_SNOWFLAKE_WAREHOUSE"],
# Dataset: https://app.snowflake.com/marketplace/listing/GZ1M6ZVQIAF/insights-global-covid-statistics
database="GLOBAL_COVID_STATISTICS",
schema=os.environ["SQLFRAME_SNOWFLAKE_SCHEMA"],
)
session = SnowflakeSession(conn=connection)
(
session.range(1)
.select(F.call_function("SNOWFLAKE.CORTEX.COMPLETE", F.lit("snowflake-arctic"), F.lit("What are large language models?")).alias("prompt_example"))
.show()
)
Example Usage
import os
from snowflake.connector import connect
from sqlframe.snowflake import SnowflakeSession
from sqlframe.snowflake import functions as F
connection = connect(
account=os.environ["SQLFRAME_SNOWFLAKE_ACCOUNT"],
user=os.environ["SQLFRAME_SNOWFLAKE_USER"],
password=os.environ["SQLFRAME_SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SQLFRAME_SNOWFLAKE_WAREHOUSE"],
# Dataset: https://app.snowflake.com/marketplace/listing/GZ1M6ZVQIAF/insights-global-covid-statistics
database="GLOBAL_COVID_STATISTICS",
schema=os.environ["SQLFRAME_SNOWFLAKE_SCHEMA"],
)
session = SnowflakeSession(conn=connection)
df = (
session.table("INSIGHTS.COVID19_METRICS_BY_COUNTRY")
.where(F.col("date").between("2021-01-01", "2021-12-31"))
.groupby("continent", "country")
.agg(
F.sum("total_case").alias("total_cases"),
F.sum("total_deaths").alias("total_deaths"),
(F.sum("total_deaths").cast("float") / F.nullif(F.sum("total_case"), F.lit(0))).alias("death_rate"),
)
.orderBy(F.col("death_rate").desc_nulls_last())
)
df.show(5)
"""
+---------------+---------+-------------+--------------+---------------+
| CONTINENT | COUNTRY | TOTAL_CASES | TOTAL_DEATHS | DEATH_RATE |
+---------------+---------+-------------+--------------+---------------+
| Asia | Yemen | 2372121 | 475483 | 0.2004463516 |
| South America | Peru | 675530177 | 62217492 | 0.09210172116 |
| North America | Mexico | 1039445357 | 89674958 | 0.08627193089 |
| Africa | Sudan | 13109385 | 929258 | 0.07088494235 |
| Asia | Syria | 9963515 | 654570 | 0.06569669439 |
+---------------+---------+-------------+--------------+---------------+
"""
Supported PySpark API Methods
See something that you would like to see supported? Open an issue!
Catalog Class
- add_table
- SQLFrame Specific: Adds a table to known schemas that SQLFrame tracks
- currentCatalog
- currentDatabase
- databaseExists
- functionExists
- getDatabase
- getFunction
- getTable
- get_columns
- SQLFrame Specific: Similar to
listColumnsbut returns SQLGlot expressions instead
- SQLFrame Specific: Similar to
- get_columns_from_schema
- SQLFrame Specific: Gets the columns from the known schemas to SQLFrame
- listCatalogs
- listColumns
- listDatabases
- listFunctions
- listTables
- setCurrentCatalog
- setCurrentDatabase
- tableExists
Column Class
- alias
- alias
- asc
- asc_nulls_first
- asc_nulls_last
- between
- cast
- contains
- desc
- desc_nulls_first
- desc_nulls_last
- endswith
- ilike
- isNotNull
- isNull
- isin
- like
- otherwise
- over
- rlike
- sql
- SQLFrame Specific: Get the SQL representation of a given column
- startswith
- when
DataFrame Class
- agg
- alias
- approxQuantile
- cache
- coalesce
- collect
- columns
- copy
- corr
- count
- cov
- createOrReplaceTempView
- crossJoin
- cube
- distinct
- drop
- dropDuplicates
- drop_duplicates
- dropna
- exceptAll
- explain
- fillna
- filter
- first
- groupBy
- groupby
- head
- intersect
- intersectAll
- join
- limit
- lineage
- Get lineage for a specific column. Returns a SQLGlot Node. Can be used to get lineage SQL or HTML representation.
- na
- orderBy
- persist
- printSchema
- replace
- schema
- select
- show
- Vertical Argument is not Supported
- sort
- sql
- SQLFrame Specific: Get the SQL representation of a given DataFrame
- stat
- toDF
- toPandas
- union
- unionAll
- unionByName
- unpivot
- where
- withColumn
- withColumnRenamed
- withColumnsRenamed
- write
Functions
- abs
- acos
- acosh
- add_months
- any_value
- Always includes nulls
- approxCountDistinct
- approx_count_distinct
- array
- array_contains
- array_distinct
- array_except
- array_intersect
- array_join
- Null values are represented as nothing instead of NULL. Ex: "a," instead of "a,NULL"
- array_max
- array_min
- array_position
- array_remove
- array_reverse
- SQLFrame Specific: Functions like
reversebut for only arrays - array_size
- array_sort
- array_union
- arrays_overlap
- asc
- asc_nulls_first
- asc_nulls_last
- ascii
- asin
- asinh
- atan
- atan2
- atanh
- avg
- base64
- bit_get
- bit_length
- bitmap_bit_position
- bitmap_bucket_number
- bitwiseNOT
- bitwise_not
- bool_and
- bool_or
- bround
- Input must be a fixed-point number
- btrim
- call_function
- cbrt
- ceil
- ceiling
- char
- char_length
- character_length
- coalesce
- col
- collect_list
- collect_set
- collate
- concat
- Can only concat strings not arrays
- concat_ws
- contains
-
- Only works on strings (does not support binary)
- convert_timezone
- corr
- cos
- cosh
- cot
- count
- countDistinct
- count_distinct
- count_if
- covar_pop
- covar_samp
- create_map
- Assumes VARCHAR datatype for key/value unless the column is explicitly casted when calling
create_map - cume_dist
- curdate
- current_date
- current_time
- current_timestamp
- current_user
- date_add
- dateadd
- date_diff
- datediff
- date_format
- date_from_unix_date
- date_sub
- date_trunc
- dayofmonth
- dayofweek
- dayofyear
- dayname
- degrees
- dense_rank
- desc
- desc_nulls_first
- desc_nulls_last
- e
- element_at
- endswith
- equal_null
- exp
- explode
- expm1
- expr
- extract
- factorial
- first_value
- Must be used with a window function and
ignoreNullscan behave slightly different in certain situations - flatten
- floor
- format_number
- from_unixtime
- getbit
- greatest
- grouping
- grouping_id
- hash
- The hash is calculated differently
- hex
- Hex on int does not produce the same result as Spark. Need to research why.
- hour
- initcap
- input_file_name
- inspect
- instr
- isnan
- isnull
- kurtosis
- lag
- last_day
- last_value
- Must be used with a window function
- lead
- least
- left
- length
- levenshtein
- threshold is not supported
- like
- lit
- ln
- localtimestamp
- locate
- log
- log10
- log1p
- log2
- lower
- lpad
- ltrim
- make_date
- make_timestamp
- map_concat
- map_keys
- max
- max_by
- md5
- mean
- median
- min
- min_by
- minute
- mode
- module
- month
- monthname
- months_between
- nanvl
- next_day
- nth_value
- offset > 1 means that previous values within the partition also share the same result. In Spark rows < offset have a Null value
- ntile
- nullif
- nullifzero
- octet_length
- overlay
- percent_rank
- percentile
- does not accept an array of percentiles
- percentile_approx
- posexplode
- Default order of columns are
col,posinstead ofpos,col - position
- pow
- quarter
- radians
- rand
- rank
- regexp_count
- regexp_extract
- regexp_extract_all
- regexp_replace
- repeat
- replace
- right
- rint
- round
- row_number
- rpad
- rtrim
- second
- sequence
- session_user
- sha
- sha1
- sha2
- shiftLeft
- shiftRight
- shiftleft
- shiftright
- sign
- signum
- sin
- sinh
- size
- skewness
- slice
- sort_array
- soundex
- split
- Regular expressions not supported
- split_part
- sqrt
- stddev
- stddev_pop
- stddev_samp
- struct
- substring
- sum
- sumDistinct
- sum_distinct
- tan
- tanh
- timestamp_add
- timestamp_diff
- timestamp_seconds
- toRadians
- to_binary
- to_date
- to_number
- to_timestamp
- to_timestamp_ntz
- toDegrees
- translate
- trim
- trunc
- try_divide
- try_to_timestamp
- typeof
- ucase
- unbase64
- unhex
- uniform
- unix_micros
- unix_millis
- unix_seconds
- unix_timestamp
- uuid
- upper
- var_pop
- var_samp
- variance
- weekofyear
- when
- year
- zeroifnull
GroupedData Class
DataFrameReader Class
DataFrameWriter Class
- csv
- insertInto
- json
- mode
- parquet
- save
- saveAsTable
- sql
- SQLFrame Specific: Get the SQL representation of the DataFrame
SparkSession Class
DataTypes
- ArrayType
- BinaryType
- BooleanType
- ByteType
- CharType
- DataType
- DateType
- DecimalType
- DoubleType
- FloatType
- IntegerType
- LongType
- Row
- ShortType
- StringType
- StructField
- StructType
- TimestampNTZType
- TimestampType
- VarcharType
Window Class
WindowSpec Class
- orderBy
- partitionBy
- rangeBetween
- rowsBetween
- sql
- SQLFrame Specific: Get the SQL representation of the WindowSpec
Extra Functionality not Present in PySpark
SQLFrame supports the following extra functionality not in PySpark
Table Class
SQLFrame provides a Table class that supports extra DML operations like update, delete and merge. This class is returned when using the table function from the DataFrameReader class.
import os
from snowflake.connector import connect
from sqlframe.snowflake import SnowflakeSession
from sqlframe.base.table import WhenMatched, WhenNotMatched
connection = connect(
account=os.environ["SQLFRAME_SNOWFLAKE_ACCOUNT"],
user=os.environ["SQLFRAME_SNOWFLAKE_USER"],
password=os.environ["SQLFRAME_SNOWFLAKE_PASSWORD"],
warehouse=os.environ["SQLFRAME_SNOWFLAKE_WAREHOUSE"],
database=os.environ["SQLFRAME_SNOWFLAKE_DATABASE"],
schema=os.environ["SQLFRAME_SNOWFLAKE_SCHEMA"],
)
session = SnowflakeSession(conn=connection)
df_employee = session.createDataFrame(
[
{"id": 1, "fname": "Jack", "lname": "Shephard", "age": 37, "store_id": 1},
{"id": 2, "fname": "John", "lname": "Locke", "age": 65, "store_id": 2},
{"id": 3, "fname": "Kate", "lname": "Austen", "age": 37, "store_id": 3},
{"id": 4, "fname": "Claire", "lname": "Littleton", "age": 27, "store_id": 1},
{"id": 5, "fname": "Hugo", "lname": "Reyes", "age": 29, "store_id": 3},
]
)
df_employee.write.mode("overwrite").saveAsTable("employee")
table_employee = session.table("employee") # This object is of Type SnowflakeTable
Update Statement
The update method of the Table class is equivalent to the UPDATE table_name statement used in standard sql.
# Generates a `LazyExpression` object which can be executed using the `execute` method
update_expr = table_employee.update(
set_={"age": table_employee["age"] + 1},
where=table_employee["id"] == 1,
)
# Executes the update statement
update_expr.execute()
# Show the result
table_employee.show()
Output:
+----+--------+-----------+-----+----------+
| id | fname | lname | age | store_id |
+----+--------+-----------+-----+----------+
| 1 | Jack | Shephard | 38 | 1 |
| 2 | John | Locke | 65 | 2 |
| 3 | Kate | Austen | 37 | 3 |
| 4 | Claire | Littleton | 27 | 1 |
| 5 | Hugo | Reyes | 29 | 3 |
+----+--------+-----------+-----+----------+
Delete Statement
The delete method of the Table class is equivalent to the DELETE FROM table_name statement used in standard sql.
# Generates a `LazyExpression` object which can be executed using the `execute` method
delete_expr = table_employee.delete(
where=table_employee["id"] == 1,
)
# Executes the delete statement
delete_expr.execute()
# Show the result
table_employee.show()
Output:
+----+--------+-----------+-----+----------+
| id | fname | lname | age | store_id |
+----+--------+-----------+-----+----------+
| 2 | John | Locke | 65 | 2 |
| 3 | Kate | Austen | 37 | 3 |
| 4 | Claire | Littleton | 27 | 1 |
| 5 | Hugo | Reyes | 29 | 3 |
+----+--------+-----------+-----+----------+
Merge Statement
The merge method of the Table class is equivalent to the MERGE INTO table_name statement used in some sql engines.
df_new_employee = session.createDataFrame(
[
{"id": 1, "fname": "Jack", "lname": "Shephard", "age": 38, "store_id": 1, "delete": False},
{"id": 2, "fname": "Cate", "lname": "Austen", "age": 39, "store_id": 5, "delete": False},
{"id": 5, "fname": "Ugo", "lname": "Reyes", "age": 29, "store_id": 3, "delete": True},
{"id": 6, "fname": "Sun-Hwa", "lname": "Kwon", "age": 27, "store_id": 5, "delete": False},
]
)
# Generates a `LazyExpression` object which can be executed using the `execute` method
merge_expr = table_employee.merge(
df_new_employee,
condition=table_employee["id"] == df_new_employee["id"],
clauses=[
WhenMatched(condition=table_employee["fname"] == df_new_employee["fname"]).update(
set_={
"age": df_new_employee["age"],
}
),
WhenMatched(condition=df_new_employee["delete"]).delete(),
WhenNotMatched().insert(
values={
"id": df_new_employee["id"],
"fname": df_new_employee["fname"],
"lname": df_new_employee["lname"],
"age": df_new_employee["age"],
"store_id": df_new_employee["store_id"],
}
),
],
)
# Executes the merge statement
merge_expr.execute()
# Show the result
table_employee.show()
Output:
+----+---------+-----------+-----+----------+
| id | fname | lname | age | store_id |
+----+---------+-----------+-----+----------+
| 1 | Jack | Shephard | 38 | 1 |
| 2 | John | Locke | 65 | 2 |
| 3 | Kate | Austen | 37 | 3 |
| 4 | Claire | Littleton | 27 | 1 |
| 6 | Sun-Hwa | Kwon | 27 | 5 |
+----+---------+-----------+-----+----------+