Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions can/io/asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def __init__(
self,
file: StringPathLike | TextIO,
channel: int = 1,
timestamps_format: str = "absolute",
**kwargs: Any,
) -> None:
"""
Expand All @@ -366,7 +367,21 @@ def __init__(
write mode, not binary write mode.
:param channel: a default channel to use when the message does not
have a channel set
:param timestamps_format: the format of timestamps in the header.
Use ``"absolute"`` (default) so that readers can recover
the original wall-clock timestamps by combining the
per-message offset with the trigger-block start time.
Use ``"relative"`` when only the elapsed time from the
start of the recording matters and no absolute time
recovery is needed.
:raises ValueError: if *timestamps_format* is not ``"absolute"`` or
``"relative"``
"""
if timestamps_format not in ("absolute", "relative"):
raise ValueError(
f"timestamps_format must be 'absolute' or 'relative', "
f"got {timestamps_format!r}"
)
if kwargs.get("append", False):
raise ValueError(
f"{self.__class__.__name__} is currently not equipped to "
Expand All @@ -375,11 +390,12 @@ def __init__(
super().__init__(file, mode="w")

self.channel = channel
self.timestamps_format = timestamps_format

# write start of file header
start_time = self._format_header_datetime(datetime.now())
self.file.write(f"date {start_time}\n")
self.file.write("base hex timestamps absolute\n")
self.file.write(f"base hex timestamps {self.timestamps_format}\n")
self.file.write("internal events logged\n")

# the last part is written with the timestamp of the first message
Expand Down Expand Up @@ -426,10 +442,22 @@ def log_event(self, message: str, timestamp: float | None = None) -> None:
# Use last known timestamp if unknown
if timestamp is None:
timestamp = self.last_timestamp
# turn into relative timestamps if necessary
if timestamp >= self.started:
timestamp -= self.started
line = self.FORMAT_EVENT.format(timestamp=timestamp, message=message)
# Compute written timestamp based on configured format
if self.timestamps_format == "absolute":
# offsets from the start of measurement
written_timestamp = (
timestamp - self.started if timestamp >= self.started else timestamp
)
else:
# deltas from the preceding event
written_timestamp = (
timestamp - self.last_timestamp
if timestamp >= self.last_timestamp
else 0.0
)
# Track last timestamp so the next event can compute its delta
self.last_timestamp = timestamp
line = self.FORMAT_EVENT.format(timestamp=written_timestamp, message=message)
self.file.write(line)

def on_message_received(self, msg: Message) -> None:
Expand Down
4 changes: 4 additions & 0 deletions doc/changelog.d/2022.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added `timestamps_format` parameter to `ASCWriter` to support configurable timestamp
format: `"absolute"` (default, timestamps are offsets from the start of measurement)
or `"relative"` (each timestamp is the delta from the preceding event), matching the
semantics described in the ASC format specification.
88 changes: 88 additions & 0 deletions test/logformats_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

TODO: correctly set preserves_channel and adds_default_channel
"""

import locale
import logging
import os
Expand Down Expand Up @@ -680,6 +681,93 @@ def test_write(self):

self.assertEqual(expected_file.read_text(), actual_file.read_text())

def test_write_timestamps_format_default_is_absolute(self):
"""ASCWriter should write 'timestamps absolute' in the header by default."""
with can.ASCWriter(self.test_file_name) as writer:
pass

content = Path(self.test_file_name).read_text()
self.assertIn("timestamps absolute", content)

def test_write_timestamps_format_relative(self):
"""ASCWriter should write 'timestamps relative' when requested."""
with can.ASCWriter(self.test_file_name, timestamps_format="relative") as writer:
pass

content = Path(self.test_file_name).read_text()
self.assertIn("timestamps relative", content)
self.assertNotIn("timestamps absolute", content)

def test_write_timestamps_format_invalid(self):
"""ASCWriter should raise ValueError for an unsupported timestamps_format."""
with self.assertRaises(ValueError):
can.ASCWriter(self.test_file_name, timestamps_format="unix")

def test_write_relative_timestamp_roundtrip(self):
"""Messages written with relative format round-trip with relative timestamps."""
msgs = [
can.Message(timestamp=100.0, arbitration_id=0x1, data=b"\x01"),
can.Message(timestamp=100.5, arbitration_id=0x2, data=b"\x02"),
]

with can.ASCWriter(self.test_file_name, timestamps_format="relative") as writer:
for m in msgs:
writer.on_message_received(m)

with can.ASCReader(self.test_file_name, relative_timestamp=True) as reader:
result = list(reader)

self.assertEqual(len(result), len(msgs))
# Timestamps in file are per-event deltas; reader reads them as-is
self.assertAlmostEqual(result[0].timestamp, 0.0, places=5)
self.assertAlmostEqual(result[1].timestamp, 0.5, places=5)

def test_write_relative_timestamps_are_per_event_deltas(self):
"""With timestamps_format='relative', each written timestamp is a delta from the
preceding event (not an offset from measurement start)."""
msgs = [
can.Message(timestamp=100.0, arbitration_id=0x1, data=b"\x01"),
can.Message(timestamp=100.3, arbitration_id=0x2, data=b"\x02"),
can.Message(timestamp=101.0, arbitration_id=0x3, data=b"\x03"),
]

with can.ASCWriter(self.test_file_name, timestamps_format="relative") as writer:
for m in msgs:
writer.on_message_received(m)

with can.ASCReader(self.test_file_name, relative_timestamp=True) as reader:
result = list(reader)

self.assertEqual(len(result), len(msgs))
# msg1: 0.0 (delta from "Start of measurement" at same time)
# msg2: 0.3 (delta from msg1)
# msg3: 0.7 (delta from msg2 — NOT 1.0, which would be absolute offset)
self.assertAlmostEqual(result[0].timestamp, 0.0, places=5)
self.assertAlmostEqual(result[1].timestamp, 0.3, places=5)
self.assertAlmostEqual(result[2].timestamp, 0.7, places=5)

def test_write_absolute_timestamps_are_offsets_from_start(self):
"""With timestamps_format='absolute' (default), each written timestamp is an
offset from the measurement start, not a per-event delta."""
msgs = [
can.Message(timestamp=100.0, arbitration_id=0x1, data=b"\x01"),
can.Message(timestamp=100.3, arbitration_id=0x2, data=b"\x02"),
can.Message(timestamp=101.0, arbitration_id=0x3, data=b"\x03"),
]

with can.ASCWriter(self.test_file_name, timestamps_format="absolute") as writer:
for m in msgs:
writer.on_message_received(m)

with can.ASCReader(self.test_file_name, relative_timestamp=True) as reader:
result = list(reader)

self.assertEqual(len(result), len(msgs))
# All timestamps are offsets from the measurement start (100.0):
self.assertAlmostEqual(result[0].timestamp, 0.0, places=5)
self.assertAlmostEqual(result[1].timestamp, 0.3, places=5)
self.assertAlmostEqual(result[2].timestamp, 1.0, places=5)

@parameterized.expand(
[
(
Expand Down