Previous
Aggregate data automatically
The hot data store stores a rolling window of recent data while continuing to write all data to blob storage. Queries to the hot data store execute significantly faster than queries to blob storage.
To configure the hot data store, add the recent_data_store
configuration to your component’s data capture settings.
Set the value of the stored_hours
field to the number of hours of recent data you would like to store.
For example, the following configuration stores 24 hours of data in the hot data store:
{
"components": [
{
"name": "sensor-1",
"api": "rdk:component:sensor",
"model": "rdk:builtin:fake",
"attributes": {},
"service_configs": [
{
"type": "data_manager",
"attributes": {
"capture_methods": [
{
"method": "Readings",
"capture_frequency_hz": 0.5,
"additional_params": {},
"recent_data_store": {
"stored_hours": 24
}
}
]
}
}
]
}
]
}
Queries execute on blob storage by default which is slower than queries to a hot data store. If you have configured a hot data store, you must specify it in any queries as the data source to be used for the query.
Use DataClient.TabularDataByMQL
with data_source
set to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
to query your hot data store:
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
# Configuration constants – replace with your actual values
API_KEY = "" # API key, find or create in your organization settings
API_KEY_ID = "" # API key ID, find or create in your organization settings
ORG_ID = "" # Organization ID, find or create in your organization settings
async def connect() -> ViamClient:
"""Establish a connection to the Viam client using API credentials."""
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID
)
return await ViamClient.create_from_dial_options(dial_options)
async def main() -> int:
viam_client = await connect()
data_client = viam_client.data_client
tabular_data = await data_client.tabular_data_by_mql(
organization_id=ORG_ID,
query=[
{
"$match": {
"component_name": "sensor-1"
}
},
{
"$limit": 10
}
],
tabular_data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE,
)
print(f"Tabular Data: {tabular_data}")
viam_client.close()
return 0
if __name__ == "__main__":
asyncio.run(main())
Use DataClient.TabularDataByMQL
with DataSource
set to datapb.TabularDataSourceType_TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
to query your hot data store:
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
orgID := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
// Create MQL stages as map slices
mqlStages := []map[string]interface{}{
{"$match": map[string]interface{}{"component_name": "sensor-1"}},
{ "$limit": 10 },
}
tabularData, err := dataClient.TabularDataByMQL(ctx, orgID, mqlStages, &app.TabularDataByMQLOptions{
TabularDataSourceType: 2,
})
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Tabular Data: %v\n", tabularData)
}
Use dataClient.TabularDataByMQL
with dataSource
set to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
to query your hot data store:
import { createViamClient } from "@viamrobotics/sdk";
// Configuration constants – replace with your actual values
let API_KEY = ""; // API key, find or create in your organization settings
let API_KEY_ID = ""; // API key ID, find or create in your organization settings
let ORG_ID = ""; // Organization ID, find or create in your organization settings
async function main(): Promise<void> {
// Create Viam client
const client = await createViamClient({
credentials: {
type: "api-key",
authEntity: API_KEY_ID,
payload: API_KEY,
},
});
const tabularData = await client.dataClient.tabularDataByMQL(
ORG_ID,
[
{ "$match": { "component_name": "sensor-1" } },
{ "$limit": 10 }
],
{
tabularDataSourceType: 2,
}
);
console.log(tabularData);
}
// Run the script
main().catch((error) => {
console.error("Script failed:", error);
process.exit(1);
});
Queries to the hot data store only return data from the hot data store, which only contains data from the time window you specified in your configuration. For example, if you queried a hot data store with 24 hours of rolling storage for temperature data above 25C, and no temperature above 25C was recorded in the last 24 hours, your query would return zero results. To query the entire history of your data, use blob storage.
Hot data store queries only support a subset of MQL aggregation operators. For more information, see Supported aggregation operators.
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!