diff --git a/canopen/node/base.py b/canopen/node/base.py index 45ad35b4..80129e7a 100644 --- a/canopen/node/base.py +++ b/canopen/node/base.py @@ -1,4 +1,4 @@ -from typing import TextIO, Union +from typing import Optional, TextIO, Union import canopen.network from canopen.objectdictionary import ObjectDictionary, import_od @@ -25,7 +25,7 @@ def __init__( object_dictionary = import_od(object_dictionary, node_id) self.object_dictionary = object_dictionary - self.id = node_id or self.object_dictionary.node_id + self.id: Optional[int] = node_id or self.object_dictionary.node_id def has_network(self) -> bool: """Check whether the node has been associated to a network.""" diff --git a/canopen/node/local.py b/canopen/node/local.py index 886d8ac8..f91b8b20 100644 --- a/canopen/node/local.py +++ b/canopen/node/local.py @@ -24,6 +24,8 @@ def __init__( object_dictionary: Union[ObjectDictionary, str], ): super(LocalNode, self).__init__(node_id, object_dictionary) + assert self.id is not None, "node_id must be specified" + self.id: int self.data_store: dict[int, dict[int, bytes]] = {} self._read_callbacks = [] diff --git a/canopen/node/remote.py b/canopen/node/remote.py index 371f784c..4e6869ea 100644 --- a/canopen/node/remote.py +++ b/canopen/node/remote.py @@ -35,6 +35,8 @@ def __init__( load_od: bool = False, ): super(RemoteNode, self).__init__(node_id, object_dictionary) + assert self.id is not None, "node_id must be specified" + self.id: int #: Enable WORKAROUND for reversed PDO mapping entries self.curtis_hack = False diff --git a/canopen/pdo/base.py b/canopen/pdo/base.py index 96ad057a..c39b5c4f 100644 --- a/canopen/pdo/base.py +++ b/canopen/pdo/base.py @@ -592,7 +592,7 @@ def remote_request(self) -> None: if self.enabled and self.rtr_allowed and self.cob_id: self.pdo_node.network.send_message(self.cob_id, bytes(), remote=True) - def wait_for_reception(self, timeout: float = 10) -> float: + def wait_for_reception(self, timeout: float = 10) -> Optional[float]: """Wait for the next transmit PDO. :param float timeout: Max time to wait in seconds. @@ -620,6 +620,8 @@ def get_data(self) -> bytes: :return: PdoVariable value as :class:`bytes`. """ + assert self.offset is not None, "PdoVariable offset not set" + assert self.pdo_parent is not None, "PdoVariable has no parent map" byte_offset, bit_offset = divmod(self.offset, 8) if bit_offset or self.length % 8: @@ -647,6 +649,8 @@ def set_data(self, data: bytes): :param data: Value for the PDO variable in the PDO message. """ + assert self.offset is not None, "PdoVariable offset not set" + assert self.pdo_parent is not None, "PdoVariable has no parent map" byte_offset, bit_offset = divmod(self.offset, 8) logger.debug("Updating %s to %s in %s", self.name, binascii.hexlify(data), self.pdo_parent.name)