atoti.tables.Tables.data_transaction()#
- Tables.data_transaction(scenario_name=None, *, allow_nested=True)#
Create a data transaction to batch several data loading operations.
It is more efficient than doing each
load()
one after the other, especially when usingload_async()
to load data concurrently in multiple tables.It avoids possibly incorrect intermediate states (e.g. if loading some new data requires dropping existing rows first).
If an exception is raised during a data transaction, it will be rolled back and the changes made until the exception will be discarded.
Note
Data transactions cannot be mixed with:
Long-running data operations such as
stream()
.Data model operations such as
create_table()
,join()
, or defining a new measure.Operations on parameter tables created from
create_parameter_hierarchy_from_members()
andcreate_parameter_simulation()
.Operations on other source scenarios than the one the transaction is started on.
- Parameters:
allow_nested (bool) –
Whether to allow starting this transaction inside an already running one.
When
False
, an error will be raised if this transaction is started while another transaction is already running, regardless of that outer transaction’s value of allow_nested. The benefit of passingFalse
is that changes made in this transaction are guaranteed, if not rolled back, to be visible to the statements outside the transaction. The drawback is that it prevents splitting transaction steps in small composable functions.When nested transactions are allowed, changes made by inner transactions contribute transparently to the outer transaction.
scenario_name (str | None) – The name of the source scenario impacted by all the table operations inside the transaction.
- Return type:
AbstractContextManager[None]
Example
>>> cities_df = pd.DataFrame( ... columns=["City", "Price"], ... data=[ ... ("Berlin", 150.0), ... ("London", 240.0), ... ("New York", 270.0), ... ("Paris", 200.0), ... ], ... ) >>> cities_table = session.read_pandas( ... cities_df, ... keys={"City"}, ... table_name="Cities", ... ) >>> extra_cities_df = pd.DataFrame( ... columns=["City", "Price"], ... data=[ ... ("Singapore", 250.0), ... ], ... ) >>> with session.tables.data_transaction(): ... cities_table += ("New York", 100.0) ... cities_table.drop(cities_table["City"] == "Paris") ... cities_table.load(extra_cities_df) >>> cities_table.head().sort_index() Price City Berlin 150.0 London 240.0 New York 100.0 Singapore 250.0
If an exception is raised during a data transaction, the changes made until the exception will be rolled back.
>>> cities_table.load(cities_df) >>> cities_table.head().sort_index() Price City Berlin 150.0 London 240.0 New York 270.0 Paris 200.0 >>> with session.tables.data_transaction(): ... cities_table += ("New York", 100.0) ... cities_table.drop(cities_table["City"] == "Paris") ... cities_table.load(extra_cities_df) ... raise Exception("Some error") Traceback (most recent call last): ... Exception: Some error >>> cities_table.head().sort_index() Price City Berlin 150.0 London 240.0 New York 270.0 Paris 200.0
Loading data concurrently in multiple tables:
>>> import asyncio >>> countries_table = session.create_table( ... "Countries", ... data_types={"City": "String", "Country": "String"}, ... keys={"City"}, ... ) >>> cities_table.join(countries_table) >>> countries_df = pd.DataFrame( ... columns=["City", "Country"], ... data=[ ... ("Berlin", "Germany"), ... ("London", "England"), ... ("New York", "USA"), ... ("Paris", "France"), ... ], ... ) >>> async def load_data_in_all_tables(tables): ... with tables.data_transaction(): ... await asyncio.gather( ... tables["Cities"].load_async(cities_df), ... tables["Countries"].load_async(countries_df), ... ) >>> cities_table.drop() >>> asyncio.run(load_data_in_all_tables(session.tables)) >>> cities_table.head() Price City Berlin 150.0 London 240.0 New York 270.0 Paris 200.0 >>> countries_table.head() Country City Berlin Germany London England New York USA Paris France
Nested transactions allowed:
>>> def composable_function(session): ... table = session.tables["Cities"] ... with session.tables.data_transaction(): ... table += ("Paris", 100.0) >>> # The function can be called in isolation: >>> composable_function(session) >>> cities_table.head().sort_index() Price City Paris 100.0 >>> with session.tables.data_transaction( ... allow_nested=False # No-op because this is the outer transaction. ... ): ... cities_table.drop() ... cities_table += ("Berlin", 200.0) ... # The function can also be called inside another transaction and will contribute to it: ... composable_function(session) ... cities_table += ("New York", 150.0) >>> cities_table.head().sort_index() Price City Berlin 200.0 New York 150.0 Paris 100.0
Nested transactions not allowed:
>>> def not_composable_function(session): ... table = session.tables["Cities"] ... with session.tables.data_transaction(allow_nested=False): ... table.drop() ... table += ("Paris", 100.0) ... assert table.row_count == 1 >>> # The function can be called in isolation: >>> not_composable_function(session) >>> with session.tables.data_transaction(): ... cities_table.drop() ... cities_table += ("Berlin", 200.0) ... # This is a programming error, the function cannot be called inside another transaction: ... not_composable_function(session) ... cities_table += ("New York", 150.0) Traceback (most recent call last): ... RuntimeError: Cannot start this transaction inside another transaction since nesting is not allowed. >>> # The last transaction was rolled back: >>> cities_table.head().sort_index() Price City Paris 100.0
See also