Use DirectQuery#

This feature is not part of the community edition: it needs to be unlocked.

This will walk you through the basics of DirectQuery by creating an application on Snowflake.

We’ll see how to:

  • Connect to an external database

  • Create a data model based on this external database

  • Use this data model in a cube

[1]:
import os
import atoti as tt
from atoti_directquery_snowflake import (
    SnowflakeConnectionInfo,
    SnowflakeTableOptions,
)

session = tt.Session()

Connecting to the external database#

Once we have a session, we can connect to the external database:

[2]:
connection_info = SnowflakeConnectionInfo(
    f"jdbc:snowflake://{os.environ['SNOWFLAKE_ACCOUNT_IDENTIFIER']}.snowflakecomputing.com/?user={os.environ['SNOWFLAKE_USERNAME']}&database=TEST_RESOURCES&schema=TESTS",
    password=os.environ["SNOWFLAKE_PASSWORD"],
)
external_database = session.connect_to_external_database(connection_info)

The choice of the type of connection is what differs between the various supported databases. For example, you can switch to ClickhouseConnectionInfo to connect to Clickhouse instead of Snowflake.

Discovering the external tables#

We can list the various tables available on the Snowflake server:

[3]:
# The method below is private and susceptible to change in patch releases.
external_database.tables._filter(schema_pattern="TUTORIAL")
[3]:
  • SNOWFLAKE
    • TEST_RESOURCES
      • TUTORIAL
        1. MULTI_COLUMN_QUANTITY
        2. MULTI_ROW_QUANTITY
        3. PRODUCTS
        4. SALES
        5. SALES_BY_PRODUCT

Building the data model#

We can see the 2 tables of interest to us, add them to the session, and join them:

[4]:
sales_table = session.add_external_table(
    external_database.tables["TEST_RESOURCES", "TUTORIAL", "SALES"],
    options=SnowflakeTableOptions(keys=["SALE_ID"]),
)
products_table = session.add_external_table(
    external_database.tables["TEST_RESOURCES", "TUTORIAL", "PRODUCTS"],
    options=SnowflakeTableOptions(keys=["PRODUCT_ID"]),
)
sales_table.join(products_table, sales_table["PRODUCT"] == products_table["PRODUCT_ID"])

Note that at any point, you can inspect the external table definition, as well as its definition as interpreted by Atoti.

[5]:
# Table definition in the external database:
external_database.tables["TEST_RESOURCES", "TUTORIAL", "SALES"]
[5]:
  • SALES
    • SALE_ID: String
    • DATE: LocalDate
    • SHOP: String
    • PRODUCT: String
    • QUANTITY: double
    • UNIT_PRICE: double
[6]:
# Table definition in the Atoti session:
sales_table
[6]:
  • SALES
    • SALE_ID
      • key: True
      • type: String
      • default_value: N/A
    • DATE
      • key: False
      • type: LocalDate
      • default_value: datetime.date(1970, 1, 1)
    • SHOP
      • key: False
      • type: String
      • default_value: N/A
    • PRODUCT
      • key: False
      • type: String
      • default_value: N/A
    • QUANTITY
      • key: False
      • type: double
      • default_value: 0.0
    • UNIT_PRICE
      • key: False
      • type: double
      • default_value: 0.0

Creating the cube#

The cube, its hierarchies and measures can be defined as we would do on a session with an in-memory database:

[7]:
cube = session.create_cube(sales_table)
l, h, m = cube.levels, cube.hierarchies, cube.measures

h["CATEGORY"] = [l["CATEGORY"], l["SUB_CATEGORY"]]
del h["SUB_CATEGORY"]

m["Max Price"] = tt.agg.max(sales_table["UNIT_PRICE"])

Running a query#

Let’s make sure everything works, by running a query:

[8]:
cube.query(m["Max Price"], levels=[l["SHOP"]])
[8]:
Max Price
SHOP
shop_0 210.00
shop_1 300.00
shop_2 300.00
shop_3 210.00
shop_4 300.00
shop_5 150.00

Refreshing the data#

If the external database receives updates, some elements retrieved by DirectQuery will be out-of-date. You can manually trigger a refresh of the session to restore synchronization:

[9]:
# The method below is private and susceptible to change in patch releases.
session._synchronize_with_external_database()

Going further#

Connect to: