Top 5 Hurdles for Intermediate Flux Users and Resources for Optimizing Flux
Now that you’ve read through Top 5 Hurdles for Flux Beginners and Resources for Learning to Use Flux, you’re well on your way to being an advanced Flux user. This post includes some of the top hurdles for intermediate Flux users as well as solutions.
Hurdle 1: Unaware of common Flux Packages and Utility Functions
Sometimes the largest hurdle that people face in life is simply not being aware of existing solutions. While Flux has a wide variety of Flux Packages and Utility Functions, there are a few that you can’t really live without if you plan to write Flux. If you’re not already familiar with the common Flux Packages and Utility Functions described below, I highly recommend taking advantage of them.
Solution 1: Some important Flux tips and Flux utilities to make writing Flux easier
Here are four Flux Packages and Utility Functions you should be aware of if you plan to write any Flux:
map(): This function is critical to working with Flux. It applies a function to each row in the input tables and assigns the output to a specified column. If you’re not familiar with map() already, take a moment to look at the documentation. We’ll use it below too.
aggregateWindow(): the aggregateWindow() function allows you to apply an aggregate or selector function (like mean(), count(), min(), max(), sum(), first() or last()) to your data within windows, specified by the every
parameter. You’re probably already familiar with aggregateWindow(), as it’s applied by default to your data when you use the Query Builder with the InfluxDB UI. For example, you might apply a mean() to your data for every 1m window with:
from(bucket:"items")
|> range(start:-1h)
|> aggregateWindow(every: 1m, fn: mean())
But did you know that you can create your own utility function to aggregate data for your use case? Pass in your own function in the fn
parameter to do so. For example, imagine we want to write a custom aggregation function that calculates 2 times the mean() of your data. To pass this custom aggregation function into our aggregateWindow() function, first define your custom function as a variable first. Note that you have to include a column parameter in your custom function because aggregateWindow() only operates on functions with that input parameter.
multByX = (x) => (column, tables=<-) => tables
|> mean(column:column)
|> map(fn: (r) => ({ r with _value: r._value * x})) from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["_field"] == "usage_system")
|> limit(n: 10)
|> aggregateWindow(every: v.windowPeriod, fn: multByX(x:2.0))
Flux InfluxDB Monitor package: This package provides tools for monitoring and alerting with InfluxDB. If you’ve created an alert through the UI, then you’ve either created a simple threshold or deadman alert. However, the functions in the package might help you craft more sophisticated checks and custom alerts. We recommend that you craft the advanced monitoring logic within a task and then apply a threshold or deadman alert to the output of that task. For example, you could label numerical data with custom statuses (e.g. like when the slope of the data is negative or positive) with the map() function within a task. The stateChanges() function detects any changes in a column over time. Use those descriptive tags in conjunction with the stateChanges() function to detect every time the slope of your data changes. Write those slope change events to a new bucket with a task and use a threshold alert to receive notifications.Please refer to Tasks and Checks for Monitoring with InfluxDB and How to Monitor States with InfluxDB for more information and examples of how to create custom tasks and how to use custom statuses in conjunction with the monitoring package, respectively. Please refer to the deadman() function documentation for an example on how to use the monitoring package in conjunction with the subDuration() function to detect when a host hasn’t reported data for the last 5 minutes.
Functions to manipulate timestamps: Time series data is difficult to work with. Flux includes several functions to work with time. However, a couple of these functions are spread out between various community package contributions and Flux packages within the documentation. A consolidated list of all of these functions provide you with a deeper understanding of how you can use Flux to work with time. The following functions allow you to manipulate your timestamps and execute a variety of time series transformations. Including:
- hourSelection(): select data between specific parts of the day.
- duration(): convert a timestamp to a duration in terms of seconds, minutes, hours, etc.
- events.Duration(): calculate the duration between events
- now(): return the current time
- system.time(): return the current time of the system
- time(): convert a Unix nanosecond timestamp to an RFC3339 timestamp
- uint(): convert RFC3339 timestamp to a Unix nanosecond timestamp
- truncateTimeColumn(): round or truncate an entire column to a specific timestamp unit
- date.truncate(): round or truncate data down to a specific timestamp unit.
- experimental.addDuration()*: add timestamps to each other
- experimental.subDuration()*: subtract timestamps from each other
- experimental.alignTime()*: compare data across a window; i.e., week over week or month over month.
* functions from the Flux Experimental Package. This package includes a wide variety of useful functions outside of time series transformations that might be useful to you.
Hurdle 2: Unaware of performance gains that make your query faster
When crafting a Flux script, there are certain guidelines to follow in order to optimize your query for faster performance.
Solution 2: Learning about memory optimizations and new pushdown patterns to optimize your Flux scripts
In order to provide context for the optimization guidelines, let’s first take a moment to understand how Flux works. Flux is able to query data efficiently because some functions push down the data transformation workload to storage rather than performing the transformations in memory. Combinations of functions that do this work are called pushdown patterns.
Originally, the only pushdown pattern was a basic Flux query: from() |> range() |> filter()
. Since the initial release of InfluxDB Cloud in Sept 2019, new pushdown patterns have been added to Flux. Now the following pushdown patterns are also supported:
Bare aggregates and selectors:
|> count()
|> sum()
|> first()
|> last()
|> min()
|> max()
|> mean()
Group + Bare aggregates and selectors
|>group() |> count()
|>group() |> sum()
- …etc.
Window + Bare aggregates and selectors
|>window() |> count()
|>window() |> sum()
- …etc.
AggregateWindow + Bare aggregates and selectors
|>aggregateWindow(fn: count)
|>aggregateWindow(fn: sum)
- …etc.
Append any of those patterns to your basic Flux query, from() |> range() |> filter()
, to take advantage of the new increase in query performance. If you would like the Docs team to provide a public list of the pushdown patterns that are being added to Flux, please don’t hesitate to reach out on our community site, or in our Slack channel and let us know.
For any of you advanced Flux users wanting more detail about how Flux operates, the image below is a visualization of the way that Flux executes a query. First, you write a query, then the Flux planner checks if your query matches existing pushdown patterns. If there’s a match, the planner writes a plan for how to perform your query. Next, the Flux executor executes your query by invoking hidden operations that initiate a storage read API call. These hidden operations differ based on your pushdown pattern. For example, if we execute a query with from |>range() |> filter() |>group() |>max()
, a corresponding hidden storage operation, called ReadGroupMax
, initiates data transformation on the storage side. The data is then streamed back to Flux via gRPC where Flux can then convert the data to Flux tables, or Annotated CSV.
Hurdle 3: Using schema mutation functions at the wrong time
Users can accidentally decrease their query performance by applying schema mutation functions at the wrong place in their Flux query.
Solution 3: Learn when to apply schema mutations
Schema mutation functions are any functions that change the columns in your Flux tables. They include functions like keep(), drop(), rename(), duplicate(), and set(). If you can, it’s best to reserve these functions for the end of your Flux script. If you’re using an aggregates or selector function in your query, try to include the schema mutation functions after applying aggregation functions to preserve any pushdown patterns that you might have.
Hurdle 4: Unsure about task writing best practices
Sometimes users are unaware of how to best take advantage of the UI to craft tasks. Additionally, some users might not be aware of how to take advantage of variables to improve the efficiency of their tasks.
Solution 4: Task writing recommendations.
If you’ve read any of my other posts, you know that I truly believe the InfluxDB UI is your best friend when it comes to learning how to write Flux. Learning how to craft tasks is no exception. After you’ve crafted your task in the Data Explorer, use the Save As button in the top right corner to convert your Flux script to a task.
The Flux script below is an example of a generic Flux task:
option task = {name: “rateTask”, every: 1h}rate = from(bucket: “items”)
|> range(start: -task.every)
|> filter(fn: (r) => (r._measurement == “totalItems”))
|> filter(fn: (r) => (r._field == “items”))
|> group(columns: [“itemGroupName”])
|> aggregateWindow(fn: sum, every: task.every)
|> map(fn: (r) => {
return: _time: r._time, _stop: r._stop, _start: r._start, _measurement: task.name, newTag: “rate”, _value: r._value, itemGroupName: itemGroupName,
}
})rate
|> to(bucket: “newItems”)Copy
Use the task configuration options to specify how you want your task to be executed. While you can configure the name, every, cron, offset options with the UI, there are additional options available to you. You can also decide whether you want to include concurrency or retry options. In the example above, we use the every
option to specify how frequently we want the task to run and how frequently we want to aggregate data with the aggregateWindow() function. We also use the name
option to rename our measurement easily. Use the map() function to add a new field or tag to your data. Adding new fields or tags to your data can be helpful if you want to differentiate various task outputs from a shared destination bucket. In the example above, we’re adding a tag, “newTag”, to our data. Now we can easily query the output of that task with:
from(bucket: “neItems”)
|> range(start: -task.every)
|> filter(fn: (r) => (r._measurement == “rateTask”)
|> filter(fn: (r) => (r.newTag == “rate”)
Finally, if you’re performing multiple data transformations or aggregations within your task, you want to use variables to improve the efficiency of that task. For example, if you wanted to downsample CPU stats, your task might look like this:
option task = {name: "Downsampling CPU", every: 1m}data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "my_measurement")data
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])data
|> count()
|> set(key: "agg_type",value: “count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])
Perform the top level query and store the results in a variable. Then use that variable to calculate the mean and count. Storing the output of our top-level query in a variable, instead of querying for the data twice, reduces the workload on the Flux query engine by half. Here we use the set() function instead of the map() function to add a tag value to our data. Finally, it’s worth noting that you could write a similar task with the aggregateWindow() function instead. The task could run every week, and you could calculate the count and mean for every hour within the aggregateWindow() function. Using the aggregateWindow() function to calculate multiple aggregations over a long range of time places more work on the Flux query engine than executing a task that calculates the aggregations over a short range on a frequent basis.
Hurdle 5: Unsure about when to write a downsampling task
Users are sometimes unsure about when they should write a downsampling task. They might have questions like: “How do I know when I need a downsampling task?” or “How do downsampling tasks affect my cardinality?”.
Solution 5: Learn about when to write a downsampling task
The solution to this hurdle is not simple. I will try to summarize it here, but I strongly encourage you to refer to Downsample and retain data and Downsampling with InfluxDB v2.0 for detailed explanations.There are two main reasons why users write downsampling tasks. First, you need to improve the visualization of your time series data and the performance of your dashboards. Consider aggregating your data to capture overall trends in your time series data to reduce noise in your data and increase your query/visualization performance. Second, you recognize that your data has unbounded tags and you might experience runaway series cardinality. In this case consider keeping the raw data with unbounded tags for a short period of time, by utilizing a short retention policy. Then use a downsampling task to remove the problem tag and aggregate your data as you see fit. You might be able to remove said tag with the drop() function for example. You can use the operational monitoring template to identify when you have a cardinality problem.
Please refer to Monitoring Tasks and Finding the Source of Runaway Cardinality for more information on how to identify when you have a cardinality problem.
Next steps for tacking hurdles for intermediate Flux users
I hope this post helped you tackle some of the difficult Flux questions you might have. If you still have problems writing Flux or want advice on how to optimize your Flux scripts, please ask us for help and share your story! Share your thoughts, concerns, or questions in the comments section, on our community site, or in our Slack channel. We’d love to get your feedback and help you with any problems you run into! Your good questions are the inspiration for these posts.