atoti.tables.Tables.data_transaction()#

Tables.data_transaction(scenario_name=None, *, allow_nested=True)#

Create a data transaction to batch several data loading operations.

  • It is more efficient than doing each load() one after the other, especially when using load_async() to load data concurrently in multiple tables.

  • It avoids possibly incorrect intermediate states (e.g. if loading some new data requires dropping existing rows first).

  • If an exception is raised during a data transaction, it will be rolled back and the changes made until the exception will be discarded.

Note

Data transactions cannot be mixed with:

Parameters:
  • allow_nested (bool) –

    Whether to allow starting this transaction inside an already running one.

    When False, an error will be raised if this transaction is started while another transaction is already running, regardless of that outer transaction’s value of allow_nested. The benefit of passing False is that changes made in this transaction are guaranteed, if not rolled back, to be visible to the statements outside the transaction. The drawback is that it prevents splitting transaction steps in small composable functions.

    When nested transactions are allowed, changes made by inner transactions contribute transparently to the outer transaction.

  • scenario_name (str | None) – The name of the source scenario impacted by all the table operations inside the transaction.

Return type:

AbstractContextManager[None]

Example

>>> cities_df = pd.DataFrame(
...     columns=["City", "Price"],
...     data=[
...         ("Berlin", 150.0),
...         ("London", 240.0),
...         ("New York", 270.0),
...         ("Paris", 200.0),
...     ],
... )
>>> cities_table = session.read_pandas(
...     cities_df,
...     keys={"City"},
...     table_name="Cities",
... )
>>> extra_cities_df = pd.DataFrame(
...     columns=["City", "Price"],
...     data=[
...         ("Singapore", 250.0),
...     ],
... )
>>> with session.tables.data_transaction():
...     cities_table += ("New York", 100.0)
...     cities_table.drop(cities_table["City"] == "Paris")
...     cities_table.load(extra_cities_df)
>>> cities_table.head().sort_index()
           Price
City
Berlin     150.0
London     240.0
New York   100.0
Singapore  250.0

If an exception is raised during a data transaction, the changes made until the exception will be rolled back.

>>> cities_table.load(cities_df)
>>> cities_table.head().sort_index()
          Price
City
Berlin    150.0
London    240.0
New York  270.0
Paris     200.0
>>> with session.tables.data_transaction():
...     cities_table += ("New York", 100.0)
...     cities_table.drop(cities_table["City"] == "Paris")
...     cities_table.load(extra_cities_df)
...     raise Exception("Some error")
Traceback (most recent call last):
    ...
Exception: Some error
>>> cities_table.head().sort_index()
          Price
City
Berlin    150.0
London    240.0
New York  270.0
Paris     200.0

Loading data concurrently in multiple tables:

>>> import asyncio
>>> countries_table = session.create_table(
...     "Countries",
...     data_types={"City": "String", "Country": "String"},
...     keys={"City"},
... )
>>> cities_table.join(countries_table)
>>> countries_df = pd.DataFrame(
...     columns=["City", "Country"],
...     data=[
...         ("Berlin", "Germany"),
...         ("London", "England"),
...         ("New York", "USA"),
...         ("Paris", "France"),
...     ],
... )
>>> async def load_data_in_all_tables(tables):
...     with tables.data_transaction():
...         await asyncio.gather(
...             tables["Cities"].load_async(cities_df),
...             tables["Countries"].load_async(countries_df),
...         )
>>> cities_table.drop()
>>> asyncio.run(load_data_in_all_tables(session.tables))
>>> cities_table.head()
          Price
City
Berlin    150.0
London    240.0
New York  270.0
Paris     200.0
>>> countries_table.head()
          Country
City
Berlin    Germany
London    England
New York      USA
Paris      France

Nested transactions allowed:

>>> def composable_function(session):
...     table = session.tables["Cities"]
...     with session.tables.data_transaction():
...         table += ("Paris", 100.0)
>>> # The function can be called in isolation:
>>> composable_function(session)
>>> cities_table.head().sort_index()
       Price
City
Paris  100.0
>>> with session.tables.data_transaction(
...     allow_nested=False  # No-op because this is the outer transaction.
... ):
...     cities_table.drop()
...     cities_table += ("Berlin", 200.0)
...     # The function can also be called inside another transaction and will contribute to it:
...     composable_function(session)
...     cities_table += ("New York", 150.0)
>>> cities_table.head().sort_index()
          Price
City
Berlin    200.0
New York  150.0
Paris     100.0

Nested transactions not allowed:

>>> def not_composable_function(session):
...     table = session.tables["Cities"]
...     with session.tables.data_transaction(allow_nested=False):
...         table.drop()
...         table += ("Paris", 100.0)
...     assert table.row_count == 1
>>> # The function can be called in isolation:
>>> not_composable_function(session)
>>> with session.tables.data_transaction():
...     cities_table.drop()
...     cities_table += ("Berlin", 200.0)
...     # This is a programming error, the function cannot be called inside another transaction:
...     not_composable_function(session)
...     cities_table += ("New York", 150.0)
Traceback (most recent call last):
    ...
RuntimeError: Cannot start this transaction inside another transaction since nesting is not allowed.
>>> # The last transaction was rolled back:
>>> cities_table.head().sort_index()
       Price
City
Paris  100.0