Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions canopen/node/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TextIO, Union
from typing import Optional, TextIO, Union

import canopen.network
from canopen.objectdictionary import ObjectDictionary, import_od
Expand All @@ -25,7 +25,7 @@ def __init__(
object_dictionary = import_od(object_dictionary, node_id)
self.object_dictionary = object_dictionary

self.id = node_id or self.object_dictionary.node_id
self.id: Optional[int] = node_id or self.object_dictionary.node_id

def has_network(self) -> bool:
"""Check whether the node has been associated to a network."""
Expand Down
2 changes: 2 additions & 0 deletions canopen/node/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(
object_dictionary: Union[ObjectDictionary, str],
):
super(LocalNode, self).__init__(node_id, object_dictionary)
assert self.id is not None, "node_id must be specified"
self.id: int

self.data_store: dict[int, dict[int, bytes]] = {}
self._read_callbacks = []
Expand Down
2 changes: 2 additions & 0 deletions canopen/node/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def __init__(
load_od: bool = False,
):
super(RemoteNode, self).__init__(node_id, object_dictionary)
assert self.id is not None, "node_id must be specified"
self.id: int

#: Enable WORKAROUND for reversed PDO mapping entries
self.curtis_hack = False
Expand Down
6 changes: 5 additions & 1 deletion canopen/pdo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ def remote_request(self) -> None:
if self.enabled and self.rtr_allowed and self.cob_id:
self.pdo_node.network.send_message(self.cob_id, bytes(), remote=True)

def wait_for_reception(self, timeout: float = 10) -> float:
def wait_for_reception(self, timeout: float = 10) -> Optional[float]:
"""Wait for the next transmit PDO.

:param float timeout: Max time to wait in seconds.
Expand Down Expand Up @@ -620,6 +620,8 @@ def get_data(self) -> bytes:

:return: PdoVariable value as :class:`bytes`.
"""
assert self.offset is not None, "PdoVariable offset not set"
assert self.pdo_parent is not None, "PdoVariable has no parent map"
byte_offset, bit_offset = divmod(self.offset, 8)

if bit_offset or self.length % 8:
Expand Down Expand Up @@ -647,6 +649,8 @@ def set_data(self, data: bytes):

:param data: Value for the PDO variable in the PDO message.
"""
assert self.offset is not None, "PdoVariable offset not set"
assert self.pdo_parent is not None, "PdoVariable has no parent map"
byte_offset, bit_offset = divmod(self.offset, 8)
logger.debug("Updating %s to %s in %s",
self.name, binascii.hexlify(data), self.pdo_parent.name)
Expand Down
Loading