Use DirectQuery#
Access to the DirectQuery plugins is required to follow this how-to.
This will walk you through the basics of DirectQuery by creating an application on Snowflake.
We’ll see how to:
Connect to an external database
Create a data model based on this external database
Use this data model in a cube
[1]:
import os
import atoti as tt
from atoti_directquery_snowflake import (
SnowflakeConnectionInfo,
SnowflakeStandardTableOptions,
)
session = tt.Session()
Connecting to the external database#
Once we have a session, we can connect to the external database:
[2]:
connection_info = SnowflakeConnectionInfo(
f"jdbc:snowflake://{os.environ['SNOWFLAKE_ACCOUNT_IDENTIFIER']}.snowflakecomputing.com/?user={os.environ['SNOWFLAKE_USERNAME']}&database=TEST_RESOURCES&schema=TESTS",
password=os.environ["SNOWFLAKE_PASSWORD"],
)
external_database = session.connect_to_external_database(connection_info)
The choice of the type of connection is what differs between the various supported databases. For example, you can switch to ClickhouseConnectionInfo
to connect to Clickhouse instead of Snowflake.
Discovering the external tables#
We can list the various tables available on the Snowflake server:
[3]:
# The method below is private and susceptible to change in patch releases.
external_database.tables._filter(schema_pattern="TUTORIAL")
[3]:
- SNOWFLAKE
- TEST_RESOURCES
- TUTORIAL
- PRODUCTS
- SALES
- TUTORIAL
- TEST_RESOURCES
Building the data model#
We can see the 2 tables of interest to us, add them to the session, and join them:
[4]:
sales_table = session.add_external_table(
external_database.tables["TEST_RESOURCES", "TUTORIAL", "SALES"],
options=SnowflakeStandardTableOptions(keys=["SALE_ID"]),
)
products_table = session.add_external_table(
external_database.tables["TEST_RESOURCES", "TUTORIAL", "PRODUCTS"],
options=SnowflakeStandardTableOptions(keys=["PRODUCT_ID"]),
)
sales_table.join(products_table, mapping={"PRODUCT": "PRODUCT_ID"})
Note that at any point, you can inspect the external table definition, as well as its definition as interpreted by atoti.
[5]:
# Table definition in the external database:
external_database.tables["TEST_RESOURCES", "TUTORIAL", "SALES"]
[5]:
SnowflakeTable(table_id=ExternalTableId(database_name='TEST_RESOURCES', schema_name='TUTORIAL', table_name='SALES'), types={'SALE_ID': 'String', 'DATE': 'LocalDate', 'SHOP': 'String', 'PRODUCT': 'String', 'QUANTITY': 'double', 'UNIT_PRICE': 'double'}, _database_key='SNOWFLAKE')
[6]:
# Table definition in the atoti session:
sales_table
[6]:
- SALES
- SALE_ID
- key: True
- type: String
- default_value: N/A
- DATE
- key: False
- type: LocalDate
- default_value: datetime.date(1970, 1, 1)
- SHOP
- key: False
- type: String
- default_value: N/A
- PRODUCT
- key: False
- type: String
- default_value: N/A
- QUANTITY
- key: False
- type: double
- default_value: 0.0
- UNIT_PRICE
- key: False
- type: double
- default_value: 0.0
- SALE_ID
Creating the cube#
The cube, its hierarchies and measures can be defined as we would do on a session with an in-memory database:
[7]:
cube = session.create_cube(sales_table)
l, h, m = cube.levels, cube.hierarchies, cube.measures
h["CATEGORY"] = [l["CATEGORY"], l["SUB_CATEGORY"]]
del h["SUB_CATEGORY"]
m["Max Price"] = tt.agg.max(sales_table["UNIT_PRICE"])
Running a query#
Let’s make sure everything works, by running a simple query:
[8]:
cube.query(m["Max Price"], levels=[l["SHOP"]])
[8]:
Max Price | |
---|---|
SHOP | |
shop_0 | 210.00 |
shop_1 | 300.00 |
shop_2 | 300.00 |
shop_3 | 210.00 |
shop_4 | 300.00 |
shop_5 | 150.00 |
Refreshing the data#
If the external database receives updates, some elements retrieved by DirectQuery will be out-of-date. You can manually trigger a refresh of the session to restore synchronization:
[9]:
# The method below is private and susceptible to change in patch releases.
session._synchronize_with_external_database()
Going further#
Connect to BigQuery, Clickhouse or Synapse