Skip to content

API Reference

Complete API documentation for pychrony, auto-generated from source code docstrings.

Connection

The main entry point for interacting with chronyd.

ChronyConnection

Context manager for chrony connections.

Provides connection reuse for multiple queries to chronyd within a single context, properly managing socket and session lifecycle.

Parameters:

Name Type Description Default
address str | None

Connection address. Supports:

  • Unix socket path: "/run/chrony/chronyd.sock"
  • IPv4: "192.168.1.1" or "192.168.1.1:323"
  • IPv6: "2001:db8::1" or "[2001:db8::1]:323"
  • None: Auto-detect (tries Unix socket paths, then localhost)
None

Methods:

Name Description
get_tracking

Get current NTP tracking status (returns TrackingStatus).

get_sources

Get configured time sources (returns list[Source]).

get_source_stats

Get source statistics (returns list[SourceStats]).

get_rtc_data

Get RTC tracking data (returns RTCData or None).

Thread Safety

NOT thread-safe. Each thread needs its own connection.

See Also

TrackingStatus: Tracking data model. Source: Time source data model. SourceStats: Source statistics data model. RTCData: RTC tracking data model.

Examples:

>>> with ChronyConnection() as conn:
...     tracking = conn.get_tracking()
...     sources = conn.get_sources()
...     stats = conn.get_source_stats()
...     rtc = conn.get_rtc_data()
Source code in src/pychrony/_core/_bindings.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
class ChronyConnection:
    """Context manager for chrony connections.

    Provides connection reuse for multiple queries to chronyd within a single
    context, properly managing socket and session lifecycle.

    Args:
        address: Connection address. Supports:

            - Unix socket path: ``"/run/chrony/chronyd.sock"``
            - IPv4: ``"192.168.1.1"`` or ``"192.168.1.1:323"``
            - IPv6: ``"2001:db8::1"`` or ``"[2001:db8::1]:323"``
            - ``None``: Auto-detect (tries Unix socket paths, then localhost)

    Methods:
        get_tracking: Get current NTP tracking status (returns `TrackingStatus`).
        get_sources: Get configured time sources (returns ``list[Source]``).
        get_source_stats: Get source statistics (returns ``list[SourceStats]``).
        get_rtc_data: Get RTC tracking data (returns `RTCData` or ``None``).

    Thread Safety:
        NOT thread-safe. Each thread needs its own connection.

    See Also:
        `TrackingStatus`: Tracking data model.
        `Source`: Time source data model.
        `SourceStats`: Source statistics data model.
        `RTCData`: RTC tracking data model.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     tracking = conn.get_tracking()
        ...     sources = conn.get_sources()
        ...     stats = conn.get_source_stats()
        ...     rtc = conn.get_rtc_data()
    """

    def __init__(self, address: str | None = None) -> None:
        """Initialize ChronyConnection with optional address.

        Args:
            address: Connection address (see class docstring for formats)
        """
        self._address = address
        self._fd: int | None = None
        self._session: Any = None
        self._session_ptr: Any = None
        self._in_context = False

    def __enter__(self) -> "ChronyConnection":
        """Enter context manager, opening connection to chronyd."""
        _check_library_available()
        self._open()
        self._in_context = True
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit context manager, closing connection to chronyd."""
        self._in_context = False
        self._close()

    def _resolve_address(self) -> bytes | None:
        """Resolve the address to use for connection.

        Returns:
            Encoded address bytes, or None for auto-detect

        Raises:
            ChronyConnectionError: If no socket path found during auto-detect
        """
        if self._address is not None:
            return self._address.encode()

        # Auto-detect: try default Unix socket paths
        for path in DEFAULT_SOCKET_PATHS:
            if os.path.exists(path):
                return path.encode()

        # No Unix socket found - pass NULL to let libchrony try localhost
        return None

    def _open(self) -> None:
        """Open socket connection and initialize session.

        Raises:
            ChronyConnectionError: If connection fails
            ChronyPermissionError: If permission denied
        """
        address_bytes = self._resolve_address()

        # Open socket connection
        if address_bytes is not None:
            self._fd = _lib.chrony_open_socket(address_bytes)
        else:
            self._fd = _lib.chrony_open_socket(_ffi.NULL)

        if self._fd < 0:
            # Check for permission issues
            if self._fd == -errno.EACCES or (
                self._address is not None
                and os.path.exists(self._address)
                and not os.access(self._address, os.R_OK | os.W_OK)
            ):
                raise ChronyPermissionError(
                    f"Permission denied accessing {self._address or 'chronyd'}. "
                    "Run as root or add user to chrony group.",
                    error_code=self._fd,
                )
            address_desc = self._address or "chronyd (auto-detect)"
            raise ChronyConnectionError(
                f"Failed to connect to {address_desc}. Is chronyd running?",
                error_code=self._fd,
            )

        # Initialize session
        self._session_ptr = _ffi.new("chrony_session **")
        err = _lib.chrony_init_session(self._session_ptr, self._fd)
        if err != 0:
            # Clean up socket on failure
            _lib.chrony_close_socket(self._fd)
            self._fd = None
            raise ChronyConnectionError(
                "Failed to initialize chrony session",
                error_code=err,
            )

        self._session = self._session_ptr[0]

    def _close(self) -> None:
        """Close session and socket connection."""
        if self._session is not None and self._session != _ffi.NULL:
            _lib.chrony_deinit_session(self._session)
            self._session = None

        if self._fd is not None and self._fd >= 0:
            _lib.chrony_close_socket(self._fd)
            self._fd = None

        self._session_ptr = None

    def _ensure_context(self) -> None:
        """Ensure we're within a context manager.

        Raises:
            RuntimeError: If called outside context manager
        """
        if not self._in_context:
            raise RuntimeError(
                "ChronyConnection methods must be called within a 'with' block"
            )

    def _request_report(self, report_name: bytes) -> int:
        """Request number of records for a report type.

        Args:
            report_name: Report name (e.g., b"tracking", b"sources")

        Returns:
            Number of records available

        Raises:
            ChronyDataError: If request fails
        """
        err = _lib.chrony_request_report_number_records(self._session, report_name)
        if err != 0:
            raise ChronyDataError(
                f"Failed to request {report_name.decode()} report",
                error_code=err,
            )

        while _lib.chrony_needs_response(self._session):
            err = _lib.chrony_process_response(self._session)
            if err != 0:
                raise ChronyDataError(
                    f"Failed to process {report_name.decode()} response",
                    error_code=err,
                )

        return _lib.chrony_get_report_number_records(self._session)

    def _request_record(self, report_name: bytes, index: int) -> None:
        """Request a specific record from a report.

        Args:
            report_name: Report name (e.g., b"tracking", b"sources")
            index: Record index

        Raises:
            ChronyDataError: If request fails
        """
        err = _lib.chrony_request_record(self._session, report_name, index)
        if err != 0:
            raise ChronyDataError(
                f"Failed to request {report_name.decode()} record {index}",
                error_code=err,
            )

        while _lib.chrony_needs_response(self._session):
            err = _lib.chrony_process_response(self._session)
            if err != 0:
                raise ChronyDataError(
                    f"Failed to process {report_name.decode()} record {index}",
                    error_code=err,
                )

    def get_tracking(self) -> TrackingStatus:
        """Get current tracking status from chronyd.

        Returns:
            TrackingStatus: Current tracking information from chronyd.

        Raises:
            RuntimeError: If called outside context manager
            ChronyDataError: If tracking data is invalid or incomplete.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     status = conn.get_tracking()
            ...     print(f"Offset: {status.offset:.6f} seconds")
        """
        self._ensure_context()

        num_records = self._request_report(b"tracking")
        if num_records < 1:
            raise ChronyDataError("No tracking records available")

        self._request_record(b"tracking", 0)

        # Extract fields
        ref_id = _get_uinteger_field(self._session, "reference ID")
        leap_status_int = _get_uinteger_field(self._session, "leap status")

        try:
            leap_status = LeapStatus(leap_status_int)
        except ValueError:
            raise ChronyDataError(
                f"Unknown leap_status value {leap_status_int}. "
                "This may indicate a newer chrony version - please update pychrony."
            )

        data = {
            "reference_id": ref_id,
            "reference_id_name": _ref_id_to_name(ref_id),
            "reference_ip": _get_string_field(self._session, "address"),
            "stratum": _get_uinteger_field(self._session, "stratum"),
            "leap_status": leap_status,
            "ref_time": _get_timespec_field(self._session, "reference time"),
            "offset": _get_float_field(self._session, "current correction"),
            "last_offset": _get_float_field(self._session, "last offset"),
            "rms_offset": _get_float_field(self._session, "RMS offset"),
            "frequency": _get_float_field(self._session, "frequency offset"),
            "residual_freq": _get_float_field(self._session, "residual frequency"),
            "skew": _get_float_field(self._session, "skew"),
            "root_delay": _get_float_field(self._session, "root delay"),
            "root_dispersion": _get_float_field(self._session, "root dispersion"),
            "update_interval": _get_float_field(self._session, "last update interval"),
        }

        # Validate
        self._validate_tracking(data)

        return TrackingStatus(**data)

    def _validate_tracking(self, data: dict) -> None:
        """Validate tracking data before creating TrackingStatus."""
        if not 0 <= data["stratum"] <= 15:
            raise ChronyDataError(f"Invalid stratum: {data['stratum']}")

        float_fields = [
            "ref_time",
            "offset",
            "last_offset",
            "rms_offset",
            "frequency",
            "residual_freq",
            "skew",
            "root_delay",
            "root_dispersion",
            "update_interval",
        ]
        for field in float_fields:
            if math.isnan(data[field]) or math.isinf(data[field]):
                raise ChronyDataError(f"Invalid {field}: {data[field]}")

        non_negative = [
            "ref_time",
            "rms_offset",
            "skew",
            "root_delay",
            "root_dispersion",
            "update_interval",
        ]
        for field in non_negative:
            if data[field] < 0:
                raise ChronyDataError(f"{field} must be non-negative: {data[field]}")

    def get_sources(self) -> list[Source]:
        """Get all configured time sources from chronyd.

        Returns:
            list[Source]: List of Source objects for each configured source.
                Empty list if no sources are configured.

        Raises:
            RuntimeError: If called outside context manager
            ChronyDataError: If source data is invalid or incomplete.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     sources = conn.get_sources()
            ...     for src in sources:
            ...         print(f"{src.address}: stratum {src.stratum}")
        """
        self._ensure_context()

        num_records = self._request_report(b"sources")
        if num_records < 1:
            return []

        sources = []
        for i in range(num_records):
            self._request_record(b"sources", i)
            data = self._extract_source()
            self._validate_source(data)
            sources.append(Source(**data))

        return sources

    def _extract_source(self) -> dict:
        """Extract source fields from the current session record."""
        state_int = _get_uinteger_field(self._session, "state")
        mode_int = _get_uinteger_field(self._session, "mode")

        try:
            state = SourceState(state_int)
        except ValueError:
            raise ChronyDataError(
                f"Unknown state value {state_int}. "
                "This may indicate a newer chrony version - please update pychrony."
            )

        try:
            mode = SourceMode(mode_int)
        except ValueError:
            raise ChronyDataError(
                f"Unknown mode value {mode_int}. "
                "This may indicate a newer chrony version - please update pychrony."
            )

        # In libchrony 0.2, sources report uses TYPE_ADDRESS_OR_UINT32_IN_ADDRESS
        # which exposes either "address" (NTP sources) or "reference ID" (refclocks).
        # We check mode to determine which field to fetch.
        if mode == SourceMode.REFCLOCK:
            ref_id = _get_uinteger_field(self._session, "reference ID")
            address = _ref_id_to_name(ref_id)
        else:
            address = _get_string_field(self._session, "address")

        return {
            "address": address,
            "poll": _get_integer_field(self._session, "poll"),
            "stratum": _get_uinteger_field(self._session, "stratum"),
            "state": state,
            "mode": mode,
            "flags": _get_uinteger_field(self._session, "flags"),
            "reachability": _get_uinteger_field(self._session, "reachability"),
            "last_sample_ago": _get_uinteger_field(self._session, "last sample ago"),
            "orig_latest_meas": _get_float_field(
                self._session, "original last sample offset"
            ),
            "latest_meas": _get_float_field(
                self._session, "adjusted last sample offset"
            ),
            "latest_meas_err": _get_float_field(self._session, "last sample error"),
        }

    def _validate_source(self, data: dict) -> None:
        """Validate source data before creating Source."""
        _validate_bounded_int(data["stratum"], "stratum", 0, 15)
        _validate_bounded_int(data["reachability"], "reachability", 0, 255)
        _validate_non_negative_int(data["last_sample_ago"], "last_sample_ago")

        for field in ["orig_latest_meas", "latest_meas", "latest_meas_err"]:
            _validate_finite_float(data[field], field)

        if data["latest_meas_err"] < 0:
            raise ChronyDataError(
                f"latest_meas_err must be non-negative: {data['latest_meas_err']}"
            )

    def get_source_stats(self) -> list[SourceStats]:
        """Get statistical data for all time sources from chronyd.

        Returns:
            list[SourceStats]: List of SourceStats objects for each source.
                Empty list if no sources are configured.

        Raises:
            RuntimeError: If called outside context manager
            ChronyDataError: If statistics data is invalid or incomplete.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     stats = conn.get_source_stats()
            ...     for s in stats:
            ...         print(f"{s.address}: {s.samples} samples")
        """
        self._ensure_context()

        num_records = self._request_report(b"sourcestats")
        if num_records < 1:
            return []

        stats = []
        for i in range(num_records):
            self._request_record(b"sourcestats", i)
            data = self._extract_sourcestats()
            self._validate_sourcestats(data)
            stats.append(SourceStats(**data))

        return stats

    def _extract_sourcestats(self) -> dict:
        """Extract sourcestats fields from the current session record."""
        return {
            "reference_id": _get_uinteger_field(self._session, "reference ID"),
            "address": _get_string_field(self._session, "address"),
            "samples": _get_uinteger_field(self._session, "samples"),
            "runs": _get_uinteger_field(self._session, "runs"),
            "span": _get_uinteger_field(self._session, "span"),
            "std_dev": _get_float_field(self._session, "standard deviation"),
            "resid_freq": _get_float_field(self._session, "residual frequency"),
            "skew": _get_float_field(self._session, "skew"),
            "offset": _get_float_field(self._session, "offset"),
            "offset_err": _get_float_field(self._session, "offset error"),
        }

    def _validate_sourcestats(self, data: dict) -> None:
        """Validate sourcestats data before creating SourceStats."""
        _validate_non_negative_int(data["samples"], "samples")
        _validate_non_negative_int(data["runs"], "runs")
        _validate_non_negative_int(data["span"], "span")

        for field in ["std_dev", "resid_freq", "skew", "offset", "offset_err"]:
            _validate_finite_float(data[field], field)

        if data["std_dev"] < 0:
            raise ChronyDataError(f"std_dev must be non-negative: {data['std_dev']}")
        if data["skew"] < 0:
            raise ChronyDataError(f"skew must be non-negative: {data['skew']}")
        if data["offset_err"] < 0:
            raise ChronyDataError(
                f"offset_err must be non-negative: {data['offset_err']}"
            )

    def get_rtc_data(self) -> RTCData | None:
        """Get Real-Time Clock tracking data from chronyd.

        Returns:
            RTCData if RTC tracking is enabled, None otherwise.

        Raises:
            RuntimeError: If called outside context manager
            ChronyDataError: If RTC data is invalid or malformed.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     rtc = conn.get_rtc_data()
            ...     if rtc:
            ...         print(f"RTC offset: {rtc.offset:.6f}s")
        """
        self._ensure_context()

        num_records = self._request_report(b"rtcdata")
        if num_records < 1:
            return None

        # Try to fetch rtcdata record - may fail if RTC not actually configured
        try:
            err = _lib.chrony_request_record(self._session, b"rtcdata", 0)
            if err != 0:
                return None

            while _lib.chrony_needs_response(self._session):
                err = _lib.chrony_process_response(self._session)
                if err != 0:
                    return None
        except Exception:
            return None

        data = self._extract_rtc()
        self._validate_rtc(data)

        return RTCData(**data)

    def _extract_rtc(self) -> dict:
        """Extract RTC fields from the current session record."""
        return {
            "ref_time": _get_timespec_field(self._session, "reference time"),
            "samples": _get_uinteger_field(self._session, "samples"),
            "runs": _get_uinteger_field(self._session, "runs"),
            "span": _get_uinteger_field(self._session, "span"),
            "offset": _get_float_field(self._session, "offset"),
            "freq_offset": _get_float_field(self._session, "frequency offset"),
        }

    def _validate_rtc(self, data: dict) -> None:
        """Validate RTC data before creating RTCData."""
        _validate_non_negative_int(data["samples"], "samples")
        _validate_non_negative_int(data["runs"], "runs")
        _validate_non_negative_int(data["span"], "span")

        for field in ["ref_time", "offset", "freq_offset"]:
            _validate_finite_float(data[field], field)

        if data["ref_time"] < 0:
            raise ChronyDataError(f"ref_time must be non-negative: {data['ref_time']}")

__init__(address=None)

Initialize ChronyConnection with optional address.

Parameters:

Name Type Description Default
address str | None

Connection address (see class docstring for formats)

None
Source code in src/pychrony/_core/_bindings.py
209
210
211
212
213
214
215
216
217
218
219
def __init__(self, address: str | None = None) -> None:
    """Initialize ChronyConnection with optional address.

    Args:
        address: Connection address (see class docstring for formats)
    """
    self._address = address
    self._fd: int | None = None
    self._session: Any = None
    self._session_ptr: Any = None
    self._in_context = False

__enter__()

Enter context manager, opening connection to chronyd.

Source code in src/pychrony/_core/_bindings.py
221
222
223
224
225
226
def __enter__(self) -> "ChronyConnection":
    """Enter context manager, opening connection to chronyd."""
    _check_library_available()
    self._open()
    self._in_context = True
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit context manager, closing connection to chronyd.

Source code in src/pychrony/_core/_bindings.py
228
229
230
231
232
233
234
235
236
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit context manager, closing connection to chronyd."""
    self._in_context = False
    self._close()

get_tracking()

Get current tracking status from chronyd.

Returns:

Name Type Description
TrackingStatus TrackingStatus

Current tracking information from chronyd.

Raises:

Type Description
RuntimeError

If called outside context manager

ChronyDataError

If tracking data is invalid or incomplete.

Examples:

>>> with ChronyConnection() as conn:
...     status = conn.get_tracking()
...     print(f"Offset: {status.offset:.6f} seconds")
Source code in src/pychrony/_core/_bindings.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def get_tracking(self) -> TrackingStatus:
    """Get current tracking status from chronyd.

    Returns:
        TrackingStatus: Current tracking information from chronyd.

    Raises:
        RuntimeError: If called outside context manager
        ChronyDataError: If tracking data is invalid or incomplete.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     status = conn.get_tracking()
        ...     print(f"Offset: {status.offset:.6f} seconds")
    """
    self._ensure_context()

    num_records = self._request_report(b"tracking")
    if num_records < 1:
        raise ChronyDataError("No tracking records available")

    self._request_record(b"tracking", 0)

    # Extract fields
    ref_id = _get_uinteger_field(self._session, "reference ID")
    leap_status_int = _get_uinteger_field(self._session, "leap status")

    try:
        leap_status = LeapStatus(leap_status_int)
    except ValueError:
        raise ChronyDataError(
            f"Unknown leap_status value {leap_status_int}. "
            "This may indicate a newer chrony version - please update pychrony."
        )

    data = {
        "reference_id": ref_id,
        "reference_id_name": _ref_id_to_name(ref_id),
        "reference_ip": _get_string_field(self._session, "address"),
        "stratum": _get_uinteger_field(self._session, "stratum"),
        "leap_status": leap_status,
        "ref_time": _get_timespec_field(self._session, "reference time"),
        "offset": _get_float_field(self._session, "current correction"),
        "last_offset": _get_float_field(self._session, "last offset"),
        "rms_offset": _get_float_field(self._session, "RMS offset"),
        "frequency": _get_float_field(self._session, "frequency offset"),
        "residual_freq": _get_float_field(self._session, "residual frequency"),
        "skew": _get_float_field(self._session, "skew"),
        "root_delay": _get_float_field(self._session, "root delay"),
        "root_dispersion": _get_float_field(self._session, "root dispersion"),
        "update_interval": _get_float_field(self._session, "last update interval"),
    }

    # Validate
    self._validate_tracking(data)

    return TrackingStatus(**data)

get_sources()

Get all configured time sources from chronyd.

Returns:

Type Description
list[Source]

list[Source]: List of Source objects for each configured source. Empty list if no sources are configured.

Raises:

Type Description
RuntimeError

If called outside context manager

ChronyDataError

If source data is invalid or incomplete.

Examples:

>>> with ChronyConnection() as conn:
...     sources = conn.get_sources()
...     for src in sources:
...         print(f"{src.address}: stratum {src.stratum}")
Source code in src/pychrony/_core/_bindings.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_sources(self) -> list[Source]:
    """Get all configured time sources from chronyd.

    Returns:
        list[Source]: List of Source objects for each configured source.
            Empty list if no sources are configured.

    Raises:
        RuntimeError: If called outside context manager
        ChronyDataError: If source data is invalid or incomplete.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     sources = conn.get_sources()
        ...     for src in sources:
        ...         print(f"{src.address}: stratum {src.stratum}")
    """
    self._ensure_context()

    num_records = self._request_report(b"sources")
    if num_records < 1:
        return []

    sources = []
    for i in range(num_records):
        self._request_record(b"sources", i)
        data = self._extract_source()
        self._validate_source(data)
        sources.append(Source(**data))

    return sources

get_source_stats()

Get statistical data for all time sources from chronyd.

Returns:

Type Description
list[SourceStats]

list[SourceStats]: List of SourceStats objects for each source. Empty list if no sources are configured.

Raises:

Type Description
RuntimeError

If called outside context manager

ChronyDataError

If statistics data is invalid or incomplete.

Examples:

>>> with ChronyConnection() as conn:
...     stats = conn.get_source_stats()
...     for s in stats:
...         print(f"{s.address}: {s.samples} samples")
Source code in src/pychrony/_core/_bindings.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
def get_source_stats(self) -> list[SourceStats]:
    """Get statistical data for all time sources from chronyd.

    Returns:
        list[SourceStats]: List of SourceStats objects for each source.
            Empty list if no sources are configured.

    Raises:
        RuntimeError: If called outside context manager
        ChronyDataError: If statistics data is invalid or incomplete.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     stats = conn.get_source_stats()
        ...     for s in stats:
        ...         print(f"{s.address}: {s.samples} samples")
    """
    self._ensure_context()

    num_records = self._request_report(b"sourcestats")
    if num_records < 1:
        return []

    stats = []
    for i in range(num_records):
        self._request_record(b"sourcestats", i)
        data = self._extract_sourcestats()
        self._validate_sourcestats(data)
        stats.append(SourceStats(**data))

    return stats

get_rtc_data()

Get Real-Time Clock tracking data from chronyd.

Returns:

Type Description
RTCData | None

RTCData if RTC tracking is enabled, None otherwise.

Raises:

Type Description
RuntimeError

If called outside context manager

ChronyDataError

If RTC data is invalid or malformed.

Examples:

>>> with ChronyConnection() as conn:
...     rtc = conn.get_rtc_data()
...     if rtc:
...         print(f"RTC offset: {rtc.offset:.6f}s")
Source code in src/pychrony/_core/_bindings.py
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
def get_rtc_data(self) -> RTCData | None:
    """Get Real-Time Clock tracking data from chronyd.

    Returns:
        RTCData if RTC tracking is enabled, None otherwise.

    Raises:
        RuntimeError: If called outside context manager
        ChronyDataError: If RTC data is invalid or malformed.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     rtc = conn.get_rtc_data()
        ...     if rtc:
        ...         print(f"RTC offset: {rtc.offset:.6f}s")
    """
    self._ensure_context()

    num_records = self._request_report(b"rtcdata")
    if num_records < 1:
        return None

    # Try to fetch rtcdata record - may fail if RTC not actually configured
    try:
        err = _lib.chrony_request_record(self._session, b"rtcdata", 0)
        if err != 0:
            return None

        while _lib.chrony_needs_response(self._session):
            err = _lib.chrony_process_response(self._session)
            if err != 0:
                return None
    except Exception:
        return None

    data = self._extract_rtc()
    self._validate_rtc(data)

    return RTCData(**data)

Data Models

Frozen dataclasses representing chronyd report data.

TrackingStatus dataclass

Chrony tracking status information.

Represents the current time synchronization state from chronyd, including offset, frequency, and accuracy metrics.

Attributes:

Name Type Description
reference_id int

NTP reference identifier (uint32 as hex IP or name).

reference_id_name str

Human-readable reference source name.

reference_ip str

IP address of reference source (IPv4, IPv6, or ID#).

stratum int

NTP stratum level (0=reference clock, 1-15=downstream).

leap_status LeapStatus

Leap second status (see LeapStatus).

ref_time float

Timestamp of last measurement (seconds since epoch).

offset float

Current offset from reference (seconds, can be negative).

last_offset float

Offset at last measurement (seconds).

rms_offset float

Root mean square of recent offsets (seconds).

frequency float

Clock frequency error (parts per million).

residual_freq float

Residual frequency for current source (ppm).

skew float

Estimated error bound on frequency (ppm).

root_delay float

Total roundtrip delay to stratum-1 source (seconds).

root_dispersion float

Total dispersion to reference (seconds).

update_interval float

Seconds since last successful update.

See Also

LeapStatus: Enum for leap second status values. ChronyConnection.get_tracking: Method to retrieve this data.

Source code in src/pychrony/models.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@dataclass(frozen=True)
class TrackingStatus:
    """Chrony tracking status information.

    Represents the current time synchronization state from chronyd,
    including offset, frequency, and accuracy metrics.

    Attributes:
        reference_id: NTP reference identifier (uint32 as hex IP or name).
        reference_id_name: Human-readable reference source name.
        reference_ip: IP address of reference source (IPv4, IPv6, or ID#).
        stratum: NTP stratum level (0=reference clock, 1-15=downstream).
        leap_status: Leap second status (see `LeapStatus`).
        ref_time: Timestamp of last measurement (seconds since epoch).
        offset: Current offset from reference (seconds, can be negative).
        last_offset: Offset at last measurement (seconds).
        rms_offset: Root mean square of recent offsets (seconds).
        frequency: Clock frequency error (parts per million).
        residual_freq: Residual frequency for current source (ppm).
        skew: Estimated error bound on frequency (ppm).
        root_delay: Total roundtrip delay to stratum-1 source (seconds).
        root_dispersion: Total dispersion to reference (seconds).
        update_interval: Seconds since last successful update.

    See Also:
        `LeapStatus`: Enum for leap second status values.
        `ChronyConnection.get_tracking`: Method to retrieve this data.
    """

    reference_id: int
    reference_id_name: str
    reference_ip: str
    stratum: int
    leap_status: LeapStatus
    ref_time: float
    offset: float
    last_offset: float
    rms_offset: float
    frequency: float
    residual_freq: float
    skew: float
    root_delay: float
    root_dispersion: float
    update_interval: float

    def is_synchronized(self) -> bool:
        """Check if chronyd is synchronized to a source.

        Returns:
            True if synchronized (reference_id != 0 and stratum < 16),
            False otherwise.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     status = conn.get_tracking()
            ...     if status.is_synchronized():
            ...         print(f"Synced to {status.reference_ip}")
        """
        return self.reference_id != 0 and self.stratum < 16

    def is_leap_pending(self) -> bool:
        """Check if a leap second adjustment is pending.

        Returns:
            True if leap_status is INSERT or DELETE,
            False otherwise.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     status = conn.get_tracking()
            ...     if status.is_leap_pending():
            ...         print(f"Leap second pending: {status.leap_status.name}")
        """
        return self.leap_status in (LeapStatus.INSERT, LeapStatus.DELETE)

is_synchronized()

Check if chronyd is synchronized to a source.

Returns:

Type Description
bool

True if synchronized (reference_id != 0 and stratum < 16),

bool

False otherwise.

Examples:

>>> with ChronyConnection() as conn:
...     status = conn.get_tracking()
...     if status.is_synchronized():
...         print(f"Synced to {status.reference_ip}")
Source code in src/pychrony/models.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def is_synchronized(self) -> bool:
    """Check if chronyd is synchronized to a source.

    Returns:
        True if synchronized (reference_id != 0 and stratum < 16),
        False otherwise.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     status = conn.get_tracking()
        ...     if status.is_synchronized():
        ...         print(f"Synced to {status.reference_ip}")
    """
    return self.reference_id != 0 and self.stratum < 16

is_leap_pending()

Check if a leap second adjustment is pending.

Returns:

Type Description
bool

True if leap_status is INSERT or DELETE,

bool

False otherwise.

Examples:

>>> with ChronyConnection() as conn:
...     status = conn.get_tracking()
...     if status.is_leap_pending():
...         print(f"Leap second pending: {status.leap_status.name}")
Source code in src/pychrony/models.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def is_leap_pending(self) -> bool:
    """Check if a leap second adjustment is pending.

    Returns:
        True if leap_status is INSERT or DELETE,
        False otherwise.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     status = conn.get_tracking()
        ...     if status.is_leap_pending():
        ...         print(f"Leap second pending: {status.leap_status.name}")
    """
    return self.leap_status in (LeapStatus.INSERT, LeapStatus.DELETE)

Source dataclass

Chrony source information.

Represents an NTP server, peer, or reference clock being used as a time source by chronyd.

Attributes:

Name Type Description
address str

IP address or reference ID of the source (IPv4, IPv6, or refclock ID).

poll int

Polling interval as log2 seconds (e.g., 6 means 64 seconds).

stratum int

NTP stratum level of the source (0-15).

state SourceState

Selection state (see SourceState).

mode SourceMode

Source mode (see SourceMode).

flags int

Source flags (bitfield).

reachability int

Reachability register (8-bit, 377 octal = all recent polls succeeded).

last_sample_ago int

Seconds since last valid sample was received.

orig_latest_meas float

Original last sample offset (seconds).

latest_meas float

Adjusted last sample offset (seconds).

latest_meas_err float

Last sample error bound (seconds).

See Also

SourceState: Enum for source selection states. SourceMode: Enum for source operational modes. ChronyConnection.get_sources: Method to retrieve source list.

Source code in src/pychrony/models.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@dataclass(frozen=True)
class Source:
    """Chrony source information.

    Represents an NTP server, peer, or reference clock being used
    as a time source by chronyd.

    Attributes:
        address: IP address or reference ID of the source (IPv4, IPv6, or refclock ID).
        poll: Polling interval as log2 seconds (e.g., 6 means 64 seconds).
        stratum: NTP stratum level of the source (0-15).
        state: Selection state (see `SourceState`).
        mode: Source mode (see `SourceMode`).
        flags: Source flags (bitfield).
        reachability: Reachability register (8-bit, 377 octal = all recent polls succeeded).
        last_sample_ago: Seconds since last valid sample was received.
        orig_latest_meas: Original last sample offset (seconds).
        latest_meas: Adjusted last sample offset (seconds).
        latest_meas_err: Last sample error bound (seconds).

    See Also:
        `SourceState`: Enum for source selection states.
        `SourceMode`: Enum for source operational modes.
        `ChronyConnection.get_sources`: Method to retrieve source list.
    """

    address: str
    poll: int
    stratum: int
    state: SourceState
    mode: SourceMode
    flags: int
    reachability: int
    last_sample_ago: int
    orig_latest_meas: float
    latest_meas: float
    latest_meas_err: float

    def is_reachable(self) -> bool:
        """Check if the source has been reachable recently.

        Returns:
            True if reachability register is non-zero (at least one successful poll).

        Examples:
            >>> with ChronyConnection() as conn:
            ...     for src in conn.get_sources():
            ...         if not src.is_reachable():
            ...             print(f"Source {src.address} is unreachable")
        """
        return self.reachability > 0

    def is_selected(self) -> bool:
        """Check if this source is currently selected for synchronization.

        Returns:
            True if state is SELECTED.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     for src in conn.get_sources():
            ...         if src.is_selected():
            ...             print(f"Currently using {src.address}")
        """
        return self.state == SourceState.SELECTED

is_reachable()

Check if the source has been reachable recently.

Returns:

Type Description
bool

True if reachability register is non-zero (at least one successful poll).

Examples:

>>> with ChronyConnection() as conn:
...     for src in conn.get_sources():
...         if not src.is_reachable():
...             print(f"Source {src.address} is unreachable")
Source code in src/pychrony/models.py
225
226
227
228
229
230
231
232
233
234
235
236
237
def is_reachable(self) -> bool:
    """Check if the source has been reachable recently.

    Returns:
        True if reachability register is non-zero (at least one successful poll).

    Examples:
        >>> with ChronyConnection() as conn:
        ...     for src in conn.get_sources():
        ...         if not src.is_reachable():
        ...             print(f"Source {src.address} is unreachable")
    """
    return self.reachability > 0

is_selected()

Check if this source is currently selected for synchronization.

Returns:

Type Description
bool

True if state is SELECTED.

Examples:

>>> with ChronyConnection() as conn:
...     for src in conn.get_sources():
...         if src.is_selected():
...             print(f"Currently using {src.address}")
Source code in src/pychrony/models.py
239
240
241
242
243
244
245
246
247
248
249
250
251
def is_selected(self) -> bool:
    """Check if this source is currently selected for synchronization.

    Returns:
        True if state is SELECTED.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     for src in conn.get_sources():
        ...         if src.is_selected():
        ...             print(f"Currently using {src.address}")
    """
    return self.state == SourceState.SELECTED

SourceStats dataclass

Chrony source statistics.

Represents statistical data about measurements from an NTP source, used for drift and offset estimation.

Attributes:

Name Type Description
reference_id int

32-bit NTP reference identifier.

address str

IP address of the source (empty for reference clocks).

samples int

Number of sample points currently retained.

runs int

Number of runs of residuals with same sign.

span int

Time interval between oldest and newest samples (seconds).

std_dev float

Estimated sample standard deviation (seconds).

resid_freq float

Residual frequency (parts per million).

skew float

Frequency skew (error bound) in ppm.

offset float

Estimated offset of the source (seconds).

offset_err float

Offset error bound (seconds).

See Also

ChronyConnection.get_source_stats: Method to retrieve statistics.

Source code in src/pychrony/models.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@dataclass(frozen=True)
class SourceStats:
    """Chrony source statistics.

    Represents statistical data about measurements from an NTP source,
    used for drift and offset estimation.

    Attributes:
        reference_id: 32-bit NTP reference identifier.
        address: IP address of the source (empty for reference clocks).
        samples: Number of sample points currently retained.
        runs: Number of runs of residuals with same sign.
        span: Time interval between oldest and newest samples (seconds).
        std_dev: Estimated sample standard deviation (seconds).
        resid_freq: Residual frequency (parts per million).
        skew: Frequency skew (error bound) in ppm.
        offset: Estimated offset of the source (seconds).
        offset_err: Offset error bound (seconds).

    See Also:
        `ChronyConnection.get_source_stats`: Method to retrieve statistics.
    """

    reference_id: int
    address: str
    samples: int
    runs: int
    span: int
    std_dev: float
    resid_freq: float
    skew: float
    offset: float
    offset_err: float

    def has_sufficient_samples(self, minimum: int = 4) -> bool:
        """Check if enough samples exist for reliable statistics.

        Args:
            minimum: Minimum number of samples required (default 4).

        Returns:
            True if samples >= minimum.

        Examples:
            >>> with ChronyConnection() as conn:
            ...     for stats in conn.get_source_stats():
            ...         if stats.has_sufficient_samples(8):
            ...             print(f"{stats.address}: offset={stats.offset:.6f}s")
        """
        return self.samples >= minimum

has_sufficient_samples(minimum=4)

Check if enough samples exist for reliable statistics.

Parameters:

Name Type Description Default
minimum int

Minimum number of samples required (default 4).

4

Returns:

Type Description
bool

True if samples >= minimum.

Examples:

>>> with ChronyConnection() as conn:
...     for stats in conn.get_source_stats():
...         if stats.has_sufficient_samples(8):
...             print(f"{stats.address}: offset={stats.offset:.6f}s")
Source code in src/pychrony/models.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def has_sufficient_samples(self, minimum: int = 4) -> bool:
    """Check if enough samples exist for reliable statistics.

    Args:
        minimum: Minimum number of samples required (default 4).

    Returns:
        True if samples >= minimum.

    Examples:
        >>> with ChronyConnection() as conn:
        ...     for stats in conn.get_source_stats():
        ...         if stats.has_sufficient_samples(8):
        ...             print(f"{stats.address}: offset={stats.offset:.6f}s")
    """
    return self.samples >= minimum

RTCData dataclass

Chrony RTC (Real-Time Clock) data.

Represents information about the hardware RTC and its relationship to system time, as tracked by chronyd.

Note: RTC tracking must be enabled in chronyd configuration. If not enabled, get_rtc_data() returns None.

Attributes:

Name Type Description
ref_time float

RTC reading at last error measurement (seconds since epoch).

samples int

Number of previous measurements used for calibration.

runs int

Number of runs of residuals (indicates linear model fit quality).

span int

Time period covered by measurements (seconds).

offset float

Estimated RTC offset (fast by) in seconds.

freq_offset float

RTC frequency offset (drift rate) in parts per million.

See Also

ChronyConnection.get_rtc_data: Method to retrieve RTC data.

Source code in src/pychrony/models.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
@dataclass(frozen=True)
class RTCData:
    """Chrony RTC (Real-Time Clock) data.

    Represents information about the hardware RTC and its relationship
    to system time, as tracked by chronyd.

    Note: RTC tracking must be enabled in chronyd configuration.
    If not enabled, `get_rtc_data()` returns ``None``.

    Attributes:
        ref_time: RTC reading at last error measurement (seconds since epoch).
        samples: Number of previous measurements used for calibration.
        runs: Number of runs of residuals (indicates linear model fit quality).
        span: Time period covered by measurements (seconds).
        offset: Estimated RTC offset (fast by) in seconds.
        freq_offset: RTC frequency offset (drift rate) in parts per million.

    See Also:
        `ChronyConnection.get_rtc_data`: Method to retrieve RTC data.
    """

    ref_time: float
    samples: int
    runs: int
    span: int
    offset: float
    freq_offset: float

    def is_calibrated(self) -> bool:
        """Check if RTC has enough calibration data.

        Returns:
            True if samples > 0 (some calibration exists).

        Examples:
            >>> with ChronyConnection() as conn:
            ...     rtc = conn.get_rtc_data()
            ...     if rtc and rtc.is_calibrated():
            ...         print(f"RTC drift: {rtc.freq_offset:.2f} ppm")
        """
        return self.samples > 0

is_calibrated()

Check if RTC has enough calibration data.

Returns:

Type Description
bool

True if samples > 0 (some calibration exists).

Examples:

>>> with ChronyConnection() as conn:
...     rtc = conn.get_rtc_data()
...     if rtc and rtc.is_calibrated():
...         print(f"RTC drift: {rtc.freq_offset:.2f} ppm")
Source code in src/pychrony/models.py
335
336
337
338
339
340
341
342
343
344
345
346
347
def is_calibrated(self) -> bool:
    """Check if RTC has enough calibration data.

    Returns:
        True if samples > 0 (some calibration exists).

    Examples:
        >>> with ChronyConnection() as conn:
        ...     rtc = conn.get_rtc_data()
        ...     if rtc and rtc.is_calibrated():
        ...         print(f"RTC drift: {rtc.freq_offset:.2f} ppm")
    """
    return self.samples > 0

Enums

Categorical values for status fields.

LeapStatus

Bases: Enum

Leap second status for NTP synchronization.

Indicates whether time is normal or if a leap second adjustment is scheduled at the next midnight UTC.

Attributes:

Name Type Description
NORMAL

No leap second pending.

INSERT

Leap second will be inserted at midnight (23:59:60).

DELETE

Leap second will be deleted at midnight (skip 23:59:59).

UNSYNC

Clock is unsynchronized.

Examples:

>>> from pychrony import ChronyConnection, LeapStatus
>>> with ChronyConnection() as conn:
...     status = conn.get_tracking()
...     if status.leap_status == LeapStatus.INSERT:
...         print("Leap second insertion scheduled")
...     elif status.leap_status == LeapStatus.UNSYNC:
...         print("Clock not synchronized")
Source code in src/pychrony/models.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class LeapStatus(Enum):
    """Leap second status for NTP synchronization.

    Indicates whether time is normal or if a leap second adjustment
    is scheduled at the next midnight UTC.

    Attributes:
        NORMAL: No leap second pending.
        INSERT: Leap second will be inserted at midnight (23:59:60).
        DELETE: Leap second will be deleted at midnight (skip 23:59:59).
        UNSYNC: Clock is unsynchronized.

    Examples:
        >>> from pychrony import ChronyConnection, LeapStatus
        >>> with ChronyConnection() as conn:
        ...     status = conn.get_tracking()
        ...     if status.leap_status == LeapStatus.INSERT:
        ...         print("Leap second insertion scheduled")
        ...     elif status.leap_status == LeapStatus.UNSYNC:
        ...         print("Clock not synchronized")
    """

    NORMAL = 0
    INSERT = 1
    DELETE = 2
    UNSYNC = 3

SourceState

Bases: Enum

Selection state of a chrony time source.

Indicates whether chrony has selected, rejected, or is considering this source for time synchronization.

Attributes:

Name Type Description
SELECTED

Currently selected for synchronization.

NONSELECTABLE

Cannot be selected (bad measurements).

FALSETICKER

Detected as providing incorrect time.

JITTERY

Measurements have excessive jitter.

UNSELECTED

Valid but not currently selected.

SELECTABLE

Candidate for selection.

Examples:

>>> from pychrony import ChronyConnection, SourceState
>>> with ChronyConnection() as conn:
...     for src in conn.get_sources():
...         if src.state == SourceState.FALSETICKER:
...             print(f"Warning: {src.address} detected as falseticker")
...         elif src.state == SourceState.SELECTED:
...             print(f"Active source: {src.address}")
Source code in src/pychrony/models.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class SourceState(Enum):
    """Selection state of a chrony time source.

    Indicates whether chrony has selected, rejected, or is
    considering this source for time synchronization.

    Attributes:
        SELECTED: Currently selected for synchronization.
        NONSELECTABLE: Cannot be selected (bad measurements).
        FALSETICKER: Detected as providing incorrect time.
        JITTERY: Measurements have excessive jitter.
        UNSELECTED: Valid but not currently selected.
        SELECTABLE: Candidate for selection.

    Examples:
        >>> from pychrony import ChronyConnection, SourceState
        >>> with ChronyConnection() as conn:
        ...     for src in conn.get_sources():
        ...         if src.state == SourceState.FALSETICKER:
        ...             print(f"Warning: {src.address} detected as falseticker")
        ...         elif src.state == SourceState.SELECTED:
        ...             print(f"Active source: {src.address}")
    """

    SELECTED = 0
    NONSELECTABLE = 1
    FALSETICKER = 2
    JITTERY = 3
    UNSELECTED = 4
    SELECTABLE = 5

SourceMode

Bases: Enum

Operational mode of a chrony time source.

Distinguishes between NTP client connections, peer relationships, and local reference clocks.

Attributes:

Name Type Description
CLIENT

NTP client polling a server.

PEER

NTP peer relationship (bidirectional).

REFCLOCK

Local reference clock (GPS, PPS, etc.).

Examples:

>>> from pychrony import ChronyConnection, SourceMode
>>> with ChronyConnection() as conn:
...     for src in conn.get_sources():
...         if src.mode == SourceMode.REFCLOCK:
...             print(f"Reference clock: {src.address}")
...         elif src.mode == SourceMode.CLIENT:
...             print(f"NTP server: {src.address}")
Source code in src/pychrony/models.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class SourceMode(Enum):
    """Operational mode of a chrony time source.

    Distinguishes between NTP client connections, peer
    relationships, and local reference clocks.

    Attributes:
        CLIENT: NTP client polling a server.
        PEER: NTP peer relationship (bidirectional).
        REFCLOCK: Local reference clock (GPS, PPS, etc.).

    Examples:
        >>> from pychrony import ChronyConnection, SourceMode
        >>> with ChronyConnection() as conn:
        ...     for src in conn.get_sources():
        ...         if src.mode == SourceMode.REFCLOCK:
        ...             print(f"Reference clock: {src.address}")
        ...         elif src.mode == SourceMode.CLIENT:
        ...             print(f"NTP server: {src.address}")
    """

    CLIENT = 0
    PEER = 1
    REFCLOCK = 2

Exceptions

Exception hierarchy for error handling.

ChronyError

Bases: Exception

Base exception for all chrony-related errors.

Attributes:

Name Type Description
message

Human-readable error description

error_code

Optional numeric error code from libchrony

Source code in src/pychrony/exceptions.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ChronyError(Exception):
    """Base exception for all chrony-related errors.

    Attributes:
        message: Human-readable error description
        error_code: Optional numeric error code from libchrony
    """

    def __init__(self, message: str, error_code: Optional[int] = None):
        super().__init__(message)
        self.message = message
        self.error_code = error_code

    def __str__(self) -> str:
        if self.error_code is not None:
            return f"{self.message} (error code: {self.error_code})"
        return self.message

ChronyConnectionError

Bases: ChronyError

Raised when connection to chronyd fails.

Common causes:

  • chronyd is not running
  • Socket path does not exist
  • chrony_open_socket() returns < 0
  • chrony_init_session() returns error

Examples:

>>> from pychrony import ChronyConnection, ChronyConnectionError
>>> try:
...     with ChronyConnection() as conn:
...         status = conn.get_tracking()
... except ChronyConnectionError as e:
...     print(f"Connection failed: {e}")
Source code in src/pychrony/exceptions.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class ChronyConnectionError(ChronyError):
    """Raised when connection to chronyd fails.

    Common causes:

    - chronyd is not running
    - Socket path does not exist
    - ``chrony_open_socket()`` returns < 0
    - ``chrony_init_session()`` returns error

    Examples:
        >>> from pychrony import ChronyConnection, ChronyConnectionError
        >>> try:
        ...     with ChronyConnection() as conn:
        ...         status = conn.get_tracking()
        ... except ChronyConnectionError as e:
        ...     print(f"Connection failed: {e}")
    """

    pass

ChronyPermissionError

Bases: ChronyError

Raised when access to chronyd is denied.

Common causes:

  • User not in chrony group
  • Running as unprivileged user
  • SELinux/AppArmor restrictions

Examples:

>>> from pychrony import ChronyConnection, ChronyPermissionError
>>> try:
...     with ChronyConnection() as conn:
...         status = conn.get_tracking()
... except ChronyPermissionError as e:
...     print(f"Permission denied: {e}")
...     print("Add user to chrony group or run as root")
Source code in src/pychrony/exceptions.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class ChronyPermissionError(ChronyError):
    """Raised when access to chronyd is denied.

    Common causes:

    - User not in chrony group
    - Running as unprivileged user
    - SELinux/AppArmor restrictions

    Examples:
        >>> from pychrony import ChronyConnection, ChronyPermissionError
        >>> try:
        ...     with ChronyConnection() as conn:
        ...         status = conn.get_tracking()
        ... except ChronyPermissionError as e:
        ...     print(f"Permission denied: {e}")
        ...     print("Add user to chrony group or run as root")
    """

    pass

ChronyDataError

Bases: ChronyError

Raised when tracking data is invalid or incomplete.

Common causes:

  • chrony_get_field_index() returns < 0 (field not found)
  • chrony_process_response() returns error
  • Field validation fails (NaN, out of range)
  • Protocol version mismatch

Examples:

>>> from pychrony import ChronyConnection, ChronyDataError
>>> with ChronyConnection() as conn:
...     try:
...         status = conn.get_tracking()
...     except ChronyDataError as e:
...         print(f"Invalid data: {e}")
Source code in src/pychrony/exceptions.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class ChronyDataError(ChronyError):
    """Raised when tracking data is invalid or incomplete.

    Common causes:

    - ``chrony_get_field_index()`` returns < 0 (field not found)
    - ``chrony_process_response()`` returns error
    - Field validation fails (NaN, out of range)
    - Protocol version mismatch

    Examples:
        >>> from pychrony import ChronyConnection, ChronyDataError
        >>> with ChronyConnection() as conn:
        ...     try:
        ...         status = conn.get_tracking()
        ...     except ChronyDataError as e:
        ...         print(f"Invalid data: {e}")
    """

    pass

ChronyLibraryError

Bases: ChronyError

Raised when libchrony is not available.

Common causes:

  • libchrony not installed at runtime
  • CFFI bindings not compiled (missing libchrony-devel at build time)
  • Library version incompatible

Examples:

>>> from pychrony import ChronyConnection, ChronyLibraryError
>>> try:
...     with ChronyConnection() as conn:
...         status = conn.get_tracking()
... except ChronyLibraryError as e:
...     print(f"Library not available: {e}")
...     print("Install libchrony-devel and rebuild")
Source code in src/pychrony/exceptions.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class ChronyLibraryError(ChronyError):
    """Raised when libchrony is not available.

    Common causes:

    - libchrony not installed at runtime
    - CFFI bindings not compiled (missing libchrony-devel at build time)
    - Library version incompatible

    Examples:
        >>> from pychrony import ChronyConnection, ChronyLibraryError
        >>> try:
        ...     with ChronyConnection() as conn:
        ...         status = conn.get_tracking()
        ... except ChronyLibraryError as e:
        ...     print(f"Library not available: {e}")
        ...     print("Install libchrony-devel and rebuild")
    """

    def __init__(self, message: str):
        super().__init__(message, error_code=None)

Testing Utilities

Factory functions and pytest fixtures for testing code that uses pychrony.

testing

Testing utilities for pychrony.

This module provides factory functions and pytest fixtures for creating test instances of pychrony dataclasses with sensible defaults.

Factory Functions (for any test framework): from pychrony.testing import make_tracking, make_source status = make_tracking(stratum=3, offset=-0.001)

Pytest Fixtures (auto-discovered via plugin): def test_something(tracking_status, source): assert tracking_status.is_synchronized()

make_tracking(**overrides)

Create a TrackingStatus instance with sensible defaults.

Default state is synchronized (reference_id != 0, stratum=2).

Parameters:

Name Type Description Default
**overrides Any

Field values to override defaults

{}

Returns:

Type Description
TrackingStatus

TrackingStatus instance

Examples:

>>> make_tracking()  # Synchronized status
>>> make_tracking(stratum=16, reference_id=0)  # Unsynchronized
>>> make_tracking(leap_status=LeapStatus.INSERT)  # Leap pending
Source code in src/pychrony/testing.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def make_tracking(**overrides: Any) -> TrackingStatus:
    """Create a TrackingStatus instance with sensible defaults.

    Default state is synchronized (reference_id != 0, stratum=2).

    Args:
        **overrides: Field values to override defaults

    Returns:
        TrackingStatus instance

    Examples:
        >>> make_tracking()  # Synchronized status
        >>> make_tracking(stratum=16, reference_id=0)  # Unsynchronized
        >>> make_tracking(leap_status=LeapStatus.INSERT)  # Leap pending
    """
    return TrackingStatus(**{**TRACKING_DEFAULTS, **overrides})

make_source(**overrides)

Create a Source instance with sensible defaults.

Default state is selected and reachable.

Parameters:

Name Type Description Default
**overrides Any

Field values to override defaults.

{}

Returns:

Type Description
Source

Source instance.

Examples:

>>> make_source()  # Selected, reachable source
>>> make_source(state=SourceState.FALSETICKER)  # Falseticker
>>> make_source(reachability=0)  # Unreachable source
Source code in src/pychrony/testing.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def make_source(**overrides: Any) -> Source:
    """Create a Source instance with sensible defaults.

    Default state is selected and reachable.

    Args:
        **overrides: Field values to override defaults.

    Returns:
        Source instance.

    Examples:
        >>> make_source()  # Selected, reachable source
        >>> make_source(state=SourceState.FALSETICKER)  # Falseticker
        >>> make_source(reachability=0)  # Unreachable source
    """
    return Source(**{**SOURCE_DEFAULTS, **overrides})

make_source_stats(**overrides)

Create a SourceStats instance with sensible defaults.

Default has 8 samples (sufficient for statistics).

Parameters:

Name Type Description Default
**overrides Any

Field values to override defaults.

{}

Returns:

Type Description
SourceStats

SourceStats instance.

Examples:

>>> make_source_stats()  # Stats with 8 samples
>>> make_source_stats(samples=2)  # Insufficient samples
>>> make_source_stats(offset=0.001)  # Custom offset
Source code in src/pychrony/testing.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def make_source_stats(**overrides: Any) -> SourceStats:
    """Create a SourceStats instance with sensible defaults.

    Default has 8 samples (sufficient for statistics).

    Args:
        **overrides: Field values to override defaults.

    Returns:
        SourceStats instance.

    Examples:
        >>> make_source_stats()  # Stats with 8 samples
        >>> make_source_stats(samples=2)  # Insufficient samples
        >>> make_source_stats(offset=0.001)  # Custom offset
    """
    return SourceStats(**{**SOURCESTATS_DEFAULTS, **overrides})

make_rtc_data(**overrides)

Create an RTCData instance with sensible defaults.

Default is calibrated (samples > 0).

Parameters:

Name Type Description Default
**overrides Any

Field values to override defaults.

{}

Returns:

Type Description
RTCData

RTCData instance.

Examples:

>>> make_rtc_data()  # Calibrated RTC
>>> make_rtc_data(samples=0)  # Uncalibrated RTC
>>> make_rtc_data(freq_offset=-5.0)  # Custom drift rate
Source code in src/pychrony/testing.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def make_rtc_data(**overrides: Any) -> RTCData:
    """Create an RTCData instance with sensible defaults.

    Default is calibrated (samples > 0).

    Args:
        **overrides: Field values to override defaults.

    Returns:
        RTCData instance.

    Examples:
        >>> make_rtc_data()  # Calibrated RTC
        >>> make_rtc_data(samples=0)  # Uncalibrated RTC
        >>> make_rtc_data(freq_offset=-5.0)  # Custom drift rate
    """
    return RTCData(**{**RTCDATA_DEFAULTS, **overrides})

tracking_status()

Fixture providing a synchronized TrackingStatus with defaults.

Source code in src/pychrony/testing.py
173
174
175
176
@pytest.fixture
def tracking_status() -> TrackingStatus:
    """Fixture providing a synchronized TrackingStatus with defaults."""
    return make_tracking()

source()

Fixture providing a selected, reachable Source with defaults.

Source code in src/pychrony/testing.py
179
180
181
182
@pytest.fixture
def source() -> Source:
    """Fixture providing a selected, reachable Source with defaults."""
    return make_source()

source_stats()

Fixture providing a SourceStats with sufficient samples.

Source code in src/pychrony/testing.py
185
186
187
188
@pytest.fixture
def source_stats() -> SourceStats:
    """Fixture providing a SourceStats with sufficient samples."""
    return make_source_stats()

rtc_data()

Fixture providing a calibrated RTCData with defaults.

Source code in src/pychrony/testing.py
191
192
193
194
@pytest.fixture
def rtc_data() -> RTCData:
    """Fixture providing a calibrated RTCData with defaults."""
    return make_rtc_data()