atoti_kafka package

Module contents

Plugin to load real time Kafka streams into atoti stores.

This package is required to use atoti.store.Store.load_kafka().

atoti_kafka.JSON_DESERIALIZER = KafkaDeserializer(name='io.atoti.loading.kafka.impl.serialization.JsonDeserializer')

Core JSON deserializer.

Each JSON object corresponds to a row of the store, keys of the JSON object must match columns of the store.

atoti_kafka.create_deserializer(callback, *, batch_size=1)

Return a custom Kafka deserializer.

Parameters
  • callback (Callable[[str], Mapping[str, Any]]) – Function taking the record as a string and returning a mapping from a store column name to its value.

  • batch_size (int) –

    Size of the batch of records.

    If batch_size > 1 the records will be batched then deserialized when either the batch size is reached or the batch_duration passed in atoti.store.Store.load_kafka() has ellapsed. A bigger batch size means a higher latency but less store transactions so often better performance.

Example

Considering a store with columns c_1, c_2 and c_3, for Kafka records shaped like:

  • {"c_1": v_1, "c_2": v_2, "c_3": v_3}, this callback would work:

    columns = store.columns
    def callback(record: str):
        values = json.loads(record)
        return {column: values[column] for column in columns}
    
  • v_1, v_2, v_3, this callback would work:

    columns = store.columns
    def callback(record: str):
        values = record.split(", ")
        return {column: values[index] for index, column in enumerate(columns)}
    
Return type

KafkaDeserializer