-
-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Yet another attempt to solve TTC errors... #203
base: develop
Are you sure you want to change the base?
Conversation
Reviewer's Guide by SourceryThis pull request refactors the way accelerometer data is saved and processed in ShakeTune. It introduces a dedicated writer process to handle disk writes, which prevents blocking I/O and improves performance. The changes also simplify the graph creation process and ensure that data is saved correctly. Sequence diagram for saving accelerometer datasequenceDiagram
participant ST as ShakeTune
participant MM as MeasurementsManager
participant WP as WriterProcess
participant Queue as WriterQueue
participant Disk
ST->>MM: add_measurement(name, samples)
alt len(measurements) > chunk_size
MM->>Queue: put(measurement)
activate WP
Queue->>WP: meas
WP->>Disk: write(meas)
deactivate WP
end
ST->>MM: save_stdata(filename)
MM->>Queue: put(STOP_SENTINEL)
WP->>Disk: flush()
Disk-->>WP: done
WP-->>MM: done
MM-->>ST: done
Updated class diagram for MeasurementsManagerclassDiagram
class MeasurementsManager {
-chunk_size: int
-k_reactor
-measurements: List~Measurement~
-temp_file: Path
-writer_queue: Queue
-is_writing: Value
-writer_process: Optional~Process~
+__init__(chunk_size: int, k_reactor=None)
+clear_measurements(keep_last: bool = False)
+append_samples_to_current_measurement(additional_samples: SamplesList)
+add_measurement(name: str, samples: SamplesList = None, timeout: float = 30)
-_writer_loop(output_file: Path, write_queue: Queue, is_writing: Value)
-_flush_chunk()
+save_stdata(filename: Path, timeout: int = 30)
+get_measurements() : List~Measurement~
+load_from_stdata(filename: Path) : List~Measurement~
+load_from_csvs(klipper_CSVs: List~Path~) : List~Measurement~
+__del__()
}
class Measurement {
name: str
samples: List~Sample~
}
Measurement -- MeasurementsManager: contains
note for MeasurementsManager "Manages accelerometer measurements, including writing to disk using a dedicated process."
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Frix-x - I've reviewed your changes - here's some feedback:
Overall Comments:
- Consider adding a method to
MeasurementsManager
to check if the writer process is running. - The
__del__
method inMeasurementsManager
might not always be called, so consider a more explicit cleanup mechanism.
Here's what I looked at during the review
- 🟡 General issues: 4 issues found
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
def _writer_loop(self, output_file: Path, write_queue: Queue, is_writing: Value): | ||
try: | ||
with open(output_file, 'wb') as f: | ||
cctx = ZstdCompressor(level=3) | ||
with cctx.stream_writer(f) as compressor: | ||
while True: | ||
meas = write_queue.get() | ||
if meas == STOP_SENTINEL: | ||
break | ||
with is_writing.get_lock(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Consider consolidating the _is_writing flag update within a single critical section.
Currently, the flag is set to True immediately before writing and then set to False immediately after. Wrapping both the JSON conversion and the compressor.write call in one lock acquisition may help ensure that the flag accurately reflects the entire duration of the write operation, reducing any potential race conditions.
Suggested implementation:
with is_writing.get_lock():
is_writing.value = True
line = (json.dumps(meas) + '\n').encode('utf-8')
compressor.write(line)
is_writing.value = False
This change assumes that the entire JSON conversion and writing operation should be protected by the lock. If the JSON conversion is time-consuming and you only need to mark that a write is in progress, consider whether additional performance implications need to be evaluated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this, I just want to be able to see if a writing operation is in progress from the main thread
def run(self, filenames: Union[Path, List[Path]]) -> None: | ||
filelist = [] | ||
|
||
# Single .stdata or a legacy .csv file | ||
if isinstance(filenames, Path): | ||
if not filenames.suffix == '.stdata' and not filenames.suffix == '.csv': | ||
filenames = filenames.with_suffix('.stdata') | ||
if not filenames.exists(): | ||
raise FileNotFoundError(f'File {filenames} does not exist!') | ||
filelist.append(filenames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Clarify handling of different file types in run().
The method now accepts either a single file or a list of files and treats .stdata and .csv differently. It might be helpful to explicitly document (or enforce) that a homogeneous set of files is expected, or add error handling in case mixed file types are passed accidentally.
def run(self, filenames: Union[Path, List[Path]]) -> None: | |
filelist = [] | |
# Single .stdata or a legacy .csv file | |
if isinstance(filenames, Path): | |
if not filenames.suffix == '.stdata' and not filenames.suffix == '.csv': | |
filenames = filenames.with_suffix('.stdata') | |
if not filenames.exists(): | |
raise FileNotFoundError(f'File {filenames} does not exist!') | |
filelist.append(filenames) | |
def run(self, filenames: Union[Path, List[Path]]) -> None: | |
filelist = [] | |
# Single file handling: .stdata or legacy .csv | |
if isinstance(filenames, Path): | |
if filenames.suffix not in ('.stdata', '.csv'): | |
filenames = filenames.with_suffix('.stdata') | |
if not filenames.exists(): | |
raise FileNotFoundError(f"File {filenames} does not exist!") | |
filelist.append(filenames) | |
# List handling: enforce homogeneous file types | |
elif isinstance(filenames, list): | |
extensions = {f.suffix for f in filenames} | |
if len(extensions) > 1: | |
raise ValueError("Mixed file types are not allowed. Please supply files of a homogeneous type (.stdata or .csv).") | |
for f in filenames: | |
if f.suffix not in ('.stdata', '.csv'): | |
f = f.with_suffix('.stdata') | |
if not f.exists(): | |
raise FileNotFoundError(f"File {f} does not exist!") | |
filelist.append(f) |
@@ -36,130 +36,155 @@ | |||
|
|||
|
|||
class MeasurementsManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting the asynchronous writing logic into a dedicated MeasurementWriter
class to decouple writing and measurement management.
Consider extracting the asynchronous writing and state‐monitoring logic into a dedicated helper class (e.g., `MeasurementWriter`). That way, the `MeasurementsManager` can focus solely on measurement management while the writer class handles interprocess communication and buffering.
For example, you could create a new class:
```python
from multiprocessing import Process, Queue, Value
from pathlib import Path
import json
from zstandard import ZstdCompressor, FLUSH_FRAME
from ..helpers.console_output import ConsoleOutput
STOP_SENTINEL = 'STOP_SENTINEL'
class MeasurementWriter:
def __init__(self, output_file: Path):
self.output_file = output_file
self.queue = Queue()
self.is_writing = Value('b', False)
self.process = Process(target=self._writer_loop, args=())
self.process.start()
def _writer_loop(self):
try:
with open(self.output_file, 'wb') as f:
cctx = ZstdCompressor(level=3)
with cctx.stream_writer(f) as compressor:
while True:
meas = self.queue.get()
if meas == STOP_SENTINEL:
break
with self.is_writing.get_lock():
self.is_writing.value = True
line = (json.dumps(meas) + '\n').encode('utf-8')
compressor.write(line)
with self.is_writing.get_lock():
self.is_writing.value = False
compressor.flush(FLUSH_FRAME)
except Exception as e:
ConsoleOutput.print(f'Error writing to file {self.output_file}: {e}')
def enqueue(self, meas):
self.queue.put(meas)
def finish(self):
self.queue.put(STOP_SENTINEL)
self.process.join()
Then, in your MeasurementsManager
, hand off writing responsibilities:
class MeasurementsManager:
def __init__(self, chunk_size: int, output_file: Path, k_reactor=None):
self._chunk_size = chunk_size
self.measurements = []
self._k_reactor = k_reactor
self.writer = MeasurementWriter(output_file)
def add_measurement(self, name: str, samples: SamplesList = None):
samples = samples if samples is not None else []
self.measurements.append({'name': name, 'samples': samples})
if len(self.measurements) > self._chunk_size:
self._flush_chunk()
def _flush_chunk(self):
if len(self.measurements) <= 1:
return
# Flush all measurements except the last one.
flush_list = self.measurements[:-1]
for meas in flush_list:
self.writer.enqueue(meas)
self.measurements = self.measurements[-1:]
def save_stdata(self, final_filename: Path, timeout: int = 30):
# Flush remaining measurements.
for meas in self.measurements:
self.writer.enqueue(meas)
self.measurements = []
# Signal writer to finish.
self.writer.finish()
# Now handle renaming or final file operations.
try:
if final_filename.exists():
final_filename.unlink()
# Rename temp file to final filename.
# (Assumes self.writer.output_file is the temp file.)
Path(self.writer.output_file).rename(final_filename)
except Exception as e:
ConsoleOutput.print(f'Error finalizing file {final_filename}: {e}')
Steps to reduce complexity:
- Encapsulate Writing Logic: Shift all queue/process management from
MeasurementsManager
toMeasurementWriter
. - Simplify Manager: Have
MeasurementsManager
only enqueue measurement chunks without inner process state checks. - Abstract Waiting Mechanism: If needed, move waiting logic (based on
k_reactor
) into the writer class or provide a helper method to check the writer’s status.
These changes keep all functionality intact while reducing nesting and interleaving of concerns within a single class.
@@ -19,6 +20,8 @@ | |||
|
|||
|
|||
def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Low code quality found in compare_belts_responses - 16% (low-code-quality
)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
@@ -20,6 +21,8 @@ | |||
|
|||
|
|||
def create_vibrations_profile(gcmd, config, st_process: ShakeTuneProcess) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Low code quality found in create_vibrations_profile - 12% (low-code-quality
)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
@@ -17,6 +18,8 @@ | |||
|
|||
|
|||
def excitate_axis_at_freq(gcmd, config, st_process: ShakeTuneProcess) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Low code quality found in excitate_axis_at_freq - 15% (low-code-quality
)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
except Exception as e: | ||
ConsoleOutput.print(f'Warning: unable to assemble chunks into {filename}: {e}') | ||
# Add extension if not provided | ||
if not filename.suffix == '.stdata': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Simplify logical expression using De Morgan identities (de-morgan
)
if not filename.suffix == '.stdata': | |
if filename.suffix != '.stdata': |
|
||
# Single .stdata or a legacy .csv file | ||
if isinstance(filenames, Path): | ||
if not filenames.suffix == '.stdata' and not filenames.suffix == '.csv': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): We've found these issues:
- Simplify logical expression using De Morgan identities [×2] (
de-morgan
) - Replace multiple comparisons of same variable with
in
operator (merge-comparisons
)
if not filenames.suffix == '.stdata' and not filenames.suffix == '.csv': | |
if filenames.suffix not in ['.stdata', '.csv']: |
Summary by Sourcery
Refactor the accelerometer data saving mechanism to use a dedicated writer process and queue. This prevents blocking I/O operations and improves data integrity. Update the data format to JSON lines for improved handling and parsing. Modify the
shaketune_process.py
to accept filenames as input and load data accordingly. Update graph creation to use the new data loading mechanism and save graphs with consistent naming. Fix various bugs related to data handling and analysis, and improve error handling and reporting.Bug Fixes:
Enhancements: