Write Data
Use the Python client to write to a Synnax cluster.
Synnax supports multiple methods for writing data to a cluster. We can write directly to a channel, fetch a range and set its data, or leverage writers for writing large volumes of data.
Writes in Synnax are more complicated than reads, and, as such, we recommend checking out our concepts page to learn more about the best practices for writing data to Synnax.
Writing to a Channel
Writing to a channel requires us to write timestamps to it’s index channel before we write the data. We’ll create the following channels to use as examples:
import synnax as sy
# Create the index
timestamps = client.channels.create(
name="timestamps",
data_type=sy.DataType.TIMESTAMP,
is_index=True
)
# Create the temperature channel
my_precise_tc = client.channels.create(
name="my_precise_tc",
data_type=sy.DataType.FLOAT32,
index=timestamps.key
)
# Create the pressure channel
my_precise_tc = client.channels.create(
name="my_precise_pt",
data_type=sy.DataType.FLOAT32,
index=timestamps.key
)
We’ll make sure to write timestamps to the index before we write to the data channel:
from datetime import datetime
# Our temperature data.
temperatures = [55, 55.1, 55.7, 57.2, 58.1]
start = sy.TimeStamp.now()
times = [
start,
start + 1 * sy.TimeSpan.HOUR,
start + 2 * sy.TimeSpan.HOUR,
start + 3 * sy.TimeSpan.HOUR,
start + 4 * sy.TimeSpan.HOUR,
]
# Write the timestamps to the index
timestamps.write(start, times)
# Write the data to the channel
my_precise_tc.write(start, temperatures)
Notice how we align the two arrays using the common start
timestamp. This
tells Synnax that the first sample in the temperatures
array is associated
with the first timestamp in the timestamps
array.
Synnax will raise a ValidationError
if the index channel does not contain a
corresponding timestamp for every sample in the data channel. After all, it
wouldn’t make sense to have a temperature reading without an associated
timestamp.
Writing to a Range
Writing to a range takes away the burden of needing to correctly align the data from different channels.
We’ll create the following range as an example:
import synnax as sy
# Create the range
burst_test = client.ranges.create(
name="burst_test",
time_range=sy.TimeRange(
start=sy.TimeStamp.now(),
end=sy.TimeStamp.now() + 1 * sy.TimeSpan.HOUR
)
)
Then, we’ll write to the range using the write
method:
temperatures = [55, 55.1, 55.7, 57.2, 58.1, 58.9, 59.1, 59.2, 59.3]
pressures = [100, 100.1, 100.7, 102.2, 103.1, 103.9, 104.1, 104.2, 104.3]
# This call to write will assume that the timestamp of the first sample is
# the start of the range.
burst_test.write({
"my_precise_tc": temperatures,
"my_precise_pt": pressures,
})
Using a Writer
While the above methods are great for writing static, existing data, it’s common
to write data in a streaming fashion as it’s acquired. This is especially useful
for use in control sequences and live data processing. The Writer
class is designed
for this use case (and is actually used under the hood by the other methods).
To keep things intuitive, the writer maintains a file-like interface that is similar to Python’s built-in file objects. There are a few key differences, the most important being that writers are governed by a transaction. If you’d like to learn more about transactions and how writes work in Synnax, check out the concepts page.
We’ll create the following channels to use as examples:
import synnax as sy
# Create the index
timestamps = client.channels.create(
name="timestamps",
data_type=sy.DataType.TIMESTAMP,
is_index=True
)
# Create the temperature channel
my_precise_tc = client.channels.create(
name="my_precise_tc",
data_type=sy.DataType.FLOAT32,
index=timestamps
)
To open the writer, we use the open_writer
method on the client and provide a
starting timestamp for the first sample and a list of channels we’d like to
write to:
import time
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
time.sleep(0.1)
writer.commit()
This example will write 100 samples to the my_precise_tc
channel, each spaced
roughly 0.1 seconds apart, and will commit all writes when finished.
It’s typical to write and commit millions of samples over the course of hours or days, intermittently calling commit to ensure that the data is persisted to the cluster.
We recommend using writers within a context manager. This ensures that a writer is properly closed after use, ensuring that resources have been freed and sockets are closed.
If you can’t use a context manager, make sure you call writer.close()
when
you’re done using it.
Auto-Commit
You can also configure a writer to automatically commit written data after each
write, making it immediately available for read access. To do this, set the
enable_auto_commit
argument to True
when opening the writer:
import time
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
enable_auto_commit=True,
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
time.sleep(0.1)
Write Authorities
Writers support dynamic control handoff. Multiple writers can be opened on a channel at the same time, but only one writer is allowed to write to the channel. To determine which writer has control, an authority from 0 to 255 is assigned to each writer (or, optionally, each channel in the writer). The writer with the highest authority will be allowed to write. If two writers have the same authority, the writer that opened first will be allowed to write. For more information, see the concepts page on writers.
By default, writers are opened with an authority of ABSOLUTE
i.e. 255. This means that
no other writers can write to the channel as long as the writer is open.
Opening a writer with the same authority on all channels
To open a writer with the same authority on all channels, you can pass the authority
argument with an integer.
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
authority=100,
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
time.sleep(0.1)
Opening a writer with different authorities on each channel
To open a writer with different authorities on each channel, you can pass the authority
argument with a list of integers. This list must be the same length as the number of channels
in the writer.
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
authority=[100, 200],
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
time.sleep(0.1)
Adjusting write authorities after open
To change the authority of a writer during operation, just call set_authority
. This method
accepts a dictionary with the channel name as the key and the new authority as the value.
# Set the authority on all channels
writer.set_authority(200)
# Set the authority on just a few channels
writer.set_authority({
"timestamps": 150,
"my_precise_tc": 250,
})
Persistence/Streaming Mode
By default, writers are opened in stream + persist mode.
To change the mode of a writer, specify the enum value of the mode
argument when
opening the writer. This can be persist
, stream
, or persist_stream
.
For example, to open a writer that only persists data:
import synnax as sy
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
mode="persist"
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
Common Pitfalls
There are several common pitfalls to avoid when writing data to a Synnax cluster. These are important to avoid as they can lead to performance degradation and/or control issues.
Using Many Individual Write Calls Instead of a Writer
When writing large volumes of data in a streaming fashion (or in batches), it’s important
to use a writer instead of making individual write calls to a channel. Calls to write
on a channel use an entirely new transaction for each call - constantly creating, committing,
and closing transactions has a dramatic impact on performance. So, don’t do this:
time = client.channels.retrieve("timestamps")
my_tc = client.channels.retrieve("my_precise_tc")
for i in range(100):
# This is a very bad idea
ts = sy.TimeStamp.now()
time.write(ts, ts)
my_tc.write(ts, i)
This is also a bad idea:
# open and close a transaction for every write
for i in range(100):
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
enable_auto_commit=True,
) as writer:
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
Instead, repeatedly call write
on a single writer:
# This is dramatically more efficient
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
enable_auto_commit=True,
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
Calling Commit on Every Write
If you’re not using auto-commit, it’s important to call commit
on the writer
periodically to ensure that the data is persisted to the cluster. However, calling
commit
on every write is a bad idea. This is because commit
requires a round-trip
to the cluster to ensure that the data is persisted. This can be very slow if you’re
writing a lot of data. If you’re writing a lot of data, commit every few seconds or
turn on auto-commit.
This is a bad idea:
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})
writer.commit()
Instead, use auto-commit:
with client.open_writer(
start=sy.TimeStamp.now(),
channels=["timestamps", "my_precise_tc"],
enable_auto_commit=True,
) as writer:
for i in range(100):
writer.write({
"timestamps": sy.TimeStamp.now(),
"my_precise_tc": i,
})