Source code for snowflake_utilities.upload

r"""Submodule upload.py includes the following functions:

- **upload_pandas_df_to_snowflake** - uploads a Pandas DataFrame to Snowflake

"""


import os
import pandas as pd
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine


[docs]def upload_pandas_df_to_snowflake( data: pd.DataFrame = pd.DataFrame(), warehouse_name: str = "tiny_warehouse_mg", database_name: str = "mydb", schema_name: str = "myschema", table_name: str = "mytable", if_exists: str = "replace", ): """ Uploads a Pandas DataFrame to Snowflake. Parameters ---------- data : pd.DataFrame The Pandas DataFrame to upload warehouse_name : str The name of the warehouse to use database_name : str The name of the database to use in Snowflake schema_name : str The name of the schema to use in Snowflake table_name : str The name of the table to use in Snowflake if_exists : str The action to take if the table already exists in Snowflake Options are 'fail', 'replace', 'append' Returns ------- new_df : pd.DataFrame The Pandas DataFrame that was pulled from Snowflake following the upload """ engine = create_engine( URL( account=os.getenv("SNOWFLAKE_ACCOUNT"), user=os.getenv("SNOWFLAKE_USERNAME"), password=os.getenv("SNOWFLAKE_PASSWORD"), database=database_name, schema=schema_name, warehouse=warehouse_name, ) ) with engine.connect() as conn, conn.begin(): data.to_sql(table_name, conn, if_exists=if_exists, index=False) new_df = pd.read_sql(f"SELECT * FROM {table_name}", conn) engine.dispose() return new_df