This is part two of Getting Started Tutorials for the TICK Stack. If you’re new to the TICK Stack, I recommend first learning about different methods for writing static data in batches to InfluxDB in part one of this Getting Started series. This is a beginner’s tutorial for how and when to write real-time data to InfluxDB using:

  • Telegraf and the Exec Plugin
  • Telegraf and the Tail Plugin

The repo for this tutorial is here. For this tutorial I used Alpha Vantage’s free “Digital & Crypto Currencies Realtime” API to get the data. After you claim your key, Alpha Vantage offers real-time intraday BTC price and volume data in 5 min resolution. Here is an example of the output data.

1. Telegraf and the Exec Plugin

#exec.py
import pandas as pd
import requests
from alphavantage_auth import key
import datetime
import time
#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
#make request
def data_request():
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
#convert human readable time to unix time
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert timestamp to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = str("price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns)
# print("data gathered and converted")
return(line)
Copy

Then I configure my exec.conf file which contains the Exec Input and the InfluxDB Output with these changes:

  • Set the data collection interval to 300s (5 min) since Alpha Vantage only offers data in 5 min resolution (line 28): interval = "300s"
  • Specify the timestamp precision (line 65): precision = "ns"
  • Omit the “host” tag (line 78): omit_hostname = true
  • Specify server by uncommenting (line 93): urls = ["http://127.0.0.1:8086"] # required
  • Create a target database to write the data to (line 96): database = "exec"
  • Specify the command for the exec plugin to execute (line 228): commands = ["python /Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/exec.py" ]
  • Since I only have one command, I can comment out the name suffix (line 235): #name_suffix = "_mycollector"
  • Specify the data format to consume. Since the data has been converted to line protocol, influx is chosen (line 241): data_format = "influx"

Now I can navigate to the directory where my exec.conf file resides and run telegraf -config exec.conf. I am able to see the first point immediately, but I suggest coming back after 30 minutes to make sure that Telegraf is successfully writing real-time data:

2. Telegraf and the Tail Plugin

import pandas as pd
import requests
import time
from alphavantage_auth import key
import datetime
#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
while True:
#make request
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = ["price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns]
thefile = open('Data/tail.txt', 'a+')
for item in line:
thefile.write("%s\n" % item)
print("line added")
#alpha vantage only adds points every 5 min, so set script to sleep for 5 min as well
time.sleep(300)

Then I configure my exec.conf file which contains the Tail Input and the InfluxDB output with these changes:

  • Omit the “host” tag (line 78): omit_hostname = true
  • Specify server by uncommenting (line 93): urls = ["http://127.0.0.1:8086"] # required
  • Create a target database to write the data to (line 96): database = "tail"
  • Provide the pathway for tail.txt (line 241):files = ["/Users/anaisdotis georgiou/Desktop/GettingStarted_StreamingData/Data/tail.txt"]
  • Set the “Read from Beginning” parameter to false so that Tail reads the appended data, like the unix command tail -F --lines=0 myfile.log (line 243): from_beginning = false
  • Specify the data format to consume. Since the data has been converted to line protocol, influx is chosen (line 254): data_format = "influx"

Exec vs. Tail and When You Should Use Them

I hope you’re starting to feel more comfortable using Telegraf and InfluxDB. If you have any questions, please post them on the community site or tweet us @InfluxDB. Thanks!

Developer Advocate at InfluxData