Write Data
Use the TypeScript client to write to a Synnax cluster.
The Synnax TypeScript client supports multiple methods for writing data to a cluster. We can write directly to a channel, or we can write to multiple channels in a streaming fashion using a writer.
Writing to a Channel
Writing to a channel requires us to write timestamps to it’s index channel before we write the data. We’ll create the following channels to use as examples:
import { DataType } from "@synnaxlabs/client";
const timeChannel = await client.channels.create({
name: "time",
dataType: DataType.TIMESTAMP,
isIndex: true,
});
const temperatureChannel = await client.channels.create({
name: "temperature",
dataType: DataType.FLOAT32,
index: timeChannel.key,
});
We can then write data to timeChannel
and temperatureChannel
. We need to write to
the index channel before writing to the regular channel, so we will write to
timeChannel
first. Writing data via the channel’s write
method requires a typed
array:
import { TimeStamp, TimeSpan } from "@synnaxlabs/client";
const start = TimeStamp.now();
const timestamps = new BigInt64Array([
start.valueOf(), // valueOf() converts a TimeSpan to a bigint.
start.add(TimeSpan.seconds(1)).valueOf(),
start.add(TimeSpan.seconds(2)).valueOf(),
start.add(TimeSpan.seconds(3)).valueOf(),
start.add(TimeSpan.seconds(4)).valueOf(),
]);
const temperatures = new Float32Array([20.0, 20.1, 20.2, 20.3, 20.4]);
// Write the timestamps to the index first
await timeChannel.write(start, timestamps);
// Then write the data
await temperatureChannel.write(start, temperatures);
Notice how we align the two arrays by using the common start
timestamp. This
tells Synnax that the first sample in the temperatures
array is associated
with the first timestamp in the timestamps
array, and so on.
Synnax will raise a ValidationError
if the index does not contain a
corresponding timestamp for every sample in the data array. After all, it
wouldn’t make sense to have a temperature reading without an associated
timestamp.
Using a Writer
While the above methods are great for writing static, existing data, it’s common
to want to write data in a streaming fashion as it’s acquired. This is useful for
for use in control sequences and live data processing. The Writer
class is designed
to handle this use case (and is actually used under the hood by the above methods).
To keep things intuitive, we’ve designed the writer API around a file-like interface. There are a few key differences, the most important being that writers are governed by a transaction. If you’d like to learn more about transactions, see the concepts page.
We’ll create the following channels to use as examples:
import { DataType } from "@synnaxlabs/client";
const timeChannel = await client.channels.create({
name: "time",
dataType: DataType.TIMESTAMP,
isIndex: true,
});
const temperatureChannel = await client.channels.create({
name: "temperature",
dataType: DataType.FLOAT32,
index: timeChannel.key,
});
To open the writer, we’ll use the openWriter
method on the client:
import { TimeStamp, Series, Frame } from "@synnaxlabs/client";
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"]
});
try {
for (let i = 0; i < 100; i++) {
const start = TimeStamp.now();
const timeSeries = BigInt64Array.from({ length: 10 }, (_, i) => (
start.add(TimeSpan.milliseconds(i)).valueOf()
))
const dataSeries = Float32Array.from({ length: 10 }, (_, i) => (
Math.sin(i / 100)
))
await writer.write(new Frame({
[timeChannel.key]: new Series(timeSeries),
[temperatureChannel.key]: new Series(dataSeries)
}));
await new Promise(resolve => setTimeout(resolve, 100));
}
await writer.commit():
} finally {
await writer.close()
}
This example will write 100 batches of 10 samples to the temperature
channel, each
roughly 100ms apart, and will commit all writes when finished.
It’s typical to write and commit millions of samples over the course of hours or days, intermittently calling commit to ensure that the data is safely stored in the cluster.
Closing the Writer
It’s very important to free the writer resources when finished by calling the
close
method. If close
is not called at the end of the writer, other writers
may not be able to write to the same channels. We typically recommend placing
the writer operations inside a try-finally block to ensure that the writer is
always closed.
Different Ways of Writing Data
There are a number of argument formats that the write
method accepts. Use the
one that best fits your use case.
// Write a single sample for a channel
await writer.write("temperature", 20.0);
// Write multiple samples for a channel
await writer.write("temperature", [20.0, 20.1, 20.2, 20.3, 20.4]);
// Write a single sample for several channels
await writer.write({
time: TimeStamp.now(),
temperature: 20.0,
});
// Write multiple samples for several channels
const start = TimeStamp.now();
await writer.write({
time: [start, start.add(TimeSpan.seconds(1))],
temperature: [20.0, 20.1],
});
// Write typed arrays for several channels
await writer.write(
new Frame({
[timeChannel.key]: new BigInt64Array([
start.valueOf(),
start.add(TimeSpan.seconds(1)).valueOf(),
]),
[temperatureChannel.key]: new Float32Array([20.0, 20.1]),
}),
);
Auto-Commit
You can configure a writer to automatically commit written data after each write,
making it immediately available for read access. To do this, set the enableAutoCommit
option to true
when opening the writer:
import { TimeStamp, Series, Frame } from "@synnaxlabs/client";
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
enableAutoCommit: true,
});
try {
for (let i = 0; i < 100; i++) {
const start = TimeStamp.now();
await writer.write(
new Frame({
time: start,
temperature: Math.sin(i / 100),
}),
);
await new Promise((resolve) => setTimeout(resolve, 100));
}
} finally {
await writer.close();
}
Write Authorities
Writers support dynamic control handoff. Multiple writers can be opened on a channel at the same time, but only one writer is allowed to write to the channel. To determine which writer has control, an authority from 0 to 255 is assigned to each writer (or, optionally, each channel in the writer). The writer with the highest authority will be allowed to write. If two writers have the same authority, the writer that opened first will be allowed to write. For more information, see the concepts page on writers.
By default, writers are opened with an authority of ABSOLUTE
i.e. 255. This means that
no other writers can write to the channel as long as the writer is open.
Opening a writer with the same authority on all channels
To open a writer with the same authority on all channels, you can pass the authority
argument with an integer.
import { TimeStamp, Series, Frame } from "@synnaxlabs/client";
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
authority: 100,
});
Opening a writer with different authorities on each channel
To open a writer with different authorities on each channel, you can pass the authority
argument with a list of integers. This list must be the same length as the number of channels
in the writer.
import { TimeStamp, Series, Frame } from "@synnaxlabs/client";
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
authority: [100, 200],
});
Adjusting write authorities after open
To change the authority of a writer during operation, you can use the setAuthority
method:
// Set the authority on all channels
await writer.setAuthority(200);
// Set the authority on just a few channels
await writer.setAuthority({
time: 200,
temperature: 100,
});
Persistence/Streaming Mode
By default, writers are opened in stream + persist
mode. To change the mode of a
writer, specify the value of the mode
argument when opening the writer. This can be
persist
, stream
, or persistStream
.
For example, to open a writer that only persists data:
import { TimeStamp, Series, Frame, WriterMode } from "@synnaxlabs/client";
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
mode: "persist",
});
Common Pitfalls
There are several common pitfalls to avoid when writing data to a Synnax cluster. These are important to avoid as they can lead to performance degradation and/or control issues.
Using Many Individual Write Calls Instead of a Writer
When writing large volumes of data in a streaming fashion (or in batches), it’s important
to use a writer instead of making individual write calls to a channel. Calls to write
on a channel use an entirely new transaction for each call - constantly creating, committing,
and closing transactions has a dramatic impact on performance. So, don’t do this:
time = await client.channels.retrieve("timestamps")
my_tc = await client.channels.retrieve("my_precise_tc")
# This is a very bad idea
for i in range(100):
ts = TimeStamp.now()
await time.write(ts, ts)
await my_tc.write(ts, i)
This is also a bad idea:
for (let i = 0; i < 100; i++) {
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
enableAutoCommit: true,
});
await writer.write({
time: TimeStamp.now(),
temperature: Math.sin(i / 100),
});
await writer.close();
}
Instead, repeatedly call write
on a single writer:
# This is dramatically more efficient
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
enableAutoCommit: true
});
try {
for (let i = 0; i < 100; i++)
await writer.write({
"time": TimeStamp.now(),
"temperature": Math.sin(i / 100)
});
} finally {
await writer.close();
}
Calling Commit on Every Write
If you’re not using auto-commit, it’s important to call commit
on the writer
periodically to ensure that the data is persisted to the cluster. However, calling
commit
on every write is a bad idea. This is because commit
requires a round-trip to
the cluster to ensure that the data is persisted. This can be very slow if you’re
writing a lot of data. If you’re writing a lot of data, commit every few seconds or turn
on auto-commit.
This is a bad idea:
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
});
try {
for (let i = 0; i < 100; i++) {
await writer.write({
time: TimeStamp.now(),
temperature: Math.sin(i / 100),
});
await writer.commit();
}
} finally {
await writer.close();
}
Instead, use auto-commit:
const writer = await client.openWriter({
start: TimeStamp.now(),
channels: ["time", "temperature"],
enableAutoCommit: true,
});
try {
for (let i = 0; i < 100; i++)
await writer.write({
time: TimeStamp.now(),
temperature: Math.sin(i / 100),
});
} finally {
await writer.close();
}