Watch local files#

This shows how to watch a local directory and load files in a table as they are added to the directory.

Atoti is designed to efficiently handle data updates in its tables. When tables are updated, the new data will automatically be taken into account when querying the session.

Let’s take the example of an application analyzing sales of a company. Sales data is stored in one CSV file for each day. Each day, a new CSV containing the sales of the day will be created and we want to load it into the application.

Creating the session#

Let’s load the CSV files of the current folder in an Atoti session:

[1]:
import atoti as tt
[2]:
session = tt.Session()
[3]:
sales_table = session.read_csv(
    "resources/watch_local_files/current/*.csv", table_name="Sales", keys=["ID"]
)
[4]:
cube = session.create_cube(sales_table)
l, m = cube.levels, cube.measures

At the beginning, there are only 7 rows in the table:

[5]:
initial_row_count = 7
assert len(sales_table) == initial_row_count

and just 2 dates:

[6]:
cube.query(m["Quantity.SUM"], levels=[l["Date"]])
[6]:
Quantity.SUM
Date
2021-05-01 10
2021-05-02 7

Starting the file watcher#

We’ll use the popular watchdog library to watch our directory:

[7]:
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
[8]:
class AtotiWatcher(FileSystemEventHandler):
    def on_created(self, event):
        sales_table.load_csv(event.src_path)


observer = PollingObserver()
observer.schedule(AtotiWatcher(), "resources/watch_local_files/current")
observer.start()

Simulating the arrival of a new file#

[9]:
from shutil import copy
[10]:
# Copy the new file ...
copy(
    "resources/watch_local_files/next/sales_2021_05_03.csv",
    "resources/watch_local_files/current",
)

# ... and briefly wait until we see that new rows have been added to the table
while len(sales_table) <= initial_row_count:
    ...

That’s it! The new file has been loaded and queries on the cube will reflect it as the third date now shows up:

[11]:
cube.query(m["Quantity.SUM"], levels=[l["Date"]])
[11]:
Quantity.SUM
Date
2021-05-01 10
2021-05-02 7
2021-05-03 8

Seeing widgets change in real time#

We can redo the same operation with the app opened on one side with a pivot table making a real-time query. The widget will rerender automatically to display the new date too:

widget rerendering as a new file is picked up

Going further#

  • watchdog supports multiple FileEvents, not just the creation of a file as described here.

  • If you need to preprocess the watched files before loading them into the Atoti session, you can do it inside the on_created() method of the event handler by first reading the file with pd.read_csv(), editing the resulting dataframe, and then passing it to Table.load_pandas().

  • If you want to do multiple actions (e.g. dropping rows before loading new ones) when a file is updated, you can use Session.start_transaction().