Scale with distribution#

This feature is not part of the community edition: it needs to be unlocked.

A Session runs on a single machine and the volume of data it can handle is thus limited by the machine’s CPU and memory. Multiple sessions, each running on a dedicated machine, can be configured to join a cluster and contribute to a QueryCube in a QuerySession. Query cubes distribute parts of the execution of incoming queries to data cubes in the cluster, breaking away from the limitations imposed by the single-machine model.

Important: Distribution comes at a cost: the communication required between machines in the cluster adds latency and overhead. Atoti supports NUMA and is designed to scale vertically on large servers. Go for vertical scaling if possible since it simplifies deployment and monitoring. If vertical scaling is not option, then this page is for you.

Here, to fit in a notebook, the data cubes and the query cube will run on the same machine but, in production, the code must be rearranged so that session run on dedicated machines that can communicate with each other.

Setting up the first data cube#

To have something to distribute to, let’s start a cube that will act as our first data cube.

[1]:
import atoti as tt
import pandas as pd
[2]:
CUBE_NAME = "World"
[3]:
def define_data_model(data_session: tt.Session, /) -> None:
    table = data_session.create_table(
        "Facts",
        data_types={"ID": "String", "Region": "String", "Value": "long"},
        keys={"ID"},
    )
    cube = data_session.create_cube(table, name=CUBE_NAME)
    h, m = cube.hierarchies, cube.measures
    m["Portion"] = m["Value.SUM"] / tt.parent_value(
        m["Value.SUM"], degrees={h["Region"]: 1}
    )
[4]:
def load_data(data_session: tt.Session, /, *, region: str) -> None:
    table = data_session.tables["Facts"]
    dataframe = pd.DataFrame(
        [(f"region-{i}", region, i) for i in range(1, 11)],
        columns=list(table),
    )
    table.load(dataframe)
[5]:
north_data_session = tt.Session.start()
define_data_model(north_data_session)
load_data(north_data_session, region="North")

This session only has a single region:

[6]:
MDX = f"""SELECT
  [Facts].[Region].Members ON ROWS,
  {{[Measures].[contributors.COUNT]}} ON COLUMNS
  FROM [{CUBE_NAME}]"""  # noqa: S608
[7]:
north_data_session.query_mdx(MDX, keep_totals=True)
[7]:
contributors.COUNT
Region
Total 10
North 10

Setting up the query cube and the cluster#

[8]:
query_session = tt.QuerySession.start()

In Atoti, a cluster defines how multiple sessions can communicate with each other. Multiple data cubes and query cubes can contribute to the same cluster.

[9]:
from secrets import token_urlsafe
from tempfile import mkdtemp

from atoti_jdbc import JdbcPingDiscoveryProtocol

cluster_definition = tt.ClusterDefinition(
    application_names={CUBE_NAME},
    authentication_token=token_urlsafe(),
    discovery_protocol=JdbcPingDiscoveryProtocol(
        f"jdbc:h2:{mkdtemp('atoti-cluster')}",
        username="sa",
        password="",
    ),
)

We define a query cube configured to contribute to the cluster:

[10]:
CLUSTER_NAME = "My cluster"
[11]:
query_session.session.clusters[CLUSTER_NAME] = cluster_definition
query_session.query_cubes[CUBE_NAME] = tt.QueryCubeDefinition(
    cluster=query_session.session.clusters[CLUSTER_NAME],
    distributing_levels={("Facts", "Region", "Region")},
)

The query cube is ready to execute queries, but since no data cubes joined the cluster yet, there is no data:

[12]:
query_session.session.query_mdx(
    f"""SELECT {{[Measures].[contributors.COUNT]}} ON COLUMNS FROM [{CUBE_NAME}]""",  # noqa: S608
    keep_totals=True,
).empty
[12]:
True

Making the data cube join the cluster#

The only change required to make the query cube useful is to make the data cube join its cluster:

[13]:
north_data_session.clusters[CLUSTER_NAME] = cluster_definition

Data cubes join clusters asynchronously. As a result, their contribution will not be observable for some time. Polling can be used to to wait for join completion:

[14]:
from time import sleep


def wait_for_data_node(
    query_session: tt.QuerySession,
    /,
    *,
    max_retries: int = 60,
    region: str,
    sleep_duration: float = 1,
) -> None:
    remaining_retries = max_retries
    mdx = f"""SELECT {{[Measures].[contributors.COUNT]}} ON COLUMNS FROM [{CUBE_NAME}] WHERE [Facts].[Region].[Region].[{region}]"""  # noqa: S608

    def data_node_joined():
        try:
            return not query_session.session.query_mdx(mdx).empty
        except:  # noqa: E722
            return False

    while not data_node_joined():
        remaining_retries -= 1
        if not remaining_retries:
            raise RuntimeError(f"No contributor found for `{region}` region.")
        sleep(sleep_duration)
[15]:
wait_for_data_node(query_session, region="North")

At this point, we can observe in the query cube the same hierarchies as the ones defined in the data cube:

[16]:
query_session.session.cubes[CUBE_NAME].hierarchies
[16]:
  • Dimensions
    • Facts
      • ID
        1. ID
      • Region
        1. Region

And the measures too:

[17]:
query_session.session.cubes[CUBE_NAME].measures
[17]:
  • Measures
    • Portion
      • formatter: None
    • Value.MEAN
      • formatter: None
    • Value.SUM
      • formatter: None
    • contributors.COUNT
      • formatter: None
    • contributors.COUNT.World
      • formatter: None
    • update.TIMESTAMP
      • formatter: None

The data is also available:

[18]:
query_session.session.query_mdx(MDX, keep_totals=True)
[18]:
contributors.COUNT
Region
Total 10
North 10

The Cube.quey_cube_ids and QueryCube.data_cube_ids properties give opaque IDs that can be used to check how many cubes of the other kind are connected:

[19]:
len(north_data_session.cubes[CUBE_NAME].query_cube_ids)
[19]:
1
[20]:
len(query_session.query_cubes[CUBE_NAME].data_cube_ids)
[20]:
1

Adding more data cubes#

Distribution is only useful if multiple data cubes contribute to a query cube so let’s do that:

[21]:
south_data_session = tt.Session.start()
south_data_session.clusters[CLUSTER_NAME] = cluster_definition
define_data_model(south_data_session)
load_data(south_data_session, region="South")
[22]:
wait_for_data_node(query_session, region="South")
[23]:
query_session.session.query_mdx(MDX, keep_totals=True)
[23]:
contributors.COUNT
Region
Total 20
North 10
South 10