Commit 0fe6b65e authored by Juraj Sloboda's avatar Juraj Sloboda Committed by Matej Feder

Resolve review comments

parent 8337d8ef
......@@ -30,6 +30,8 @@ This is the PoC (Proof of Concept) implementation of:
# Installation
Required Python version: 3.7.3+
#### pydtn library
```
pip3 install pydtn/
......
......@@ -23,25 +23,24 @@ def load_config():
args = parser.parse_args()
# forbid using both formats at once
assert args.yaml == "" or args.json == ""
assert not (args.yaml and args.json)
if args.yaml != "":
yaml_config = yaml.safe_load(args.yaml)
config = {}
if args.yaml:
config = yaml.safe_load(args.yaml)
logger.debug(f"command line --yaml has value:\n{args.yaml}")
return yaml_config
if args.json != "":
json_config = json.loads(args.json)
if args.json:
config = json.loads(args.json)
logger.debug(f"command line --json has value:\n{args.json}")
return json_config
return {}
return config
def start():
config = load_config()
# Start example application that prints received bundle payloads
app.start(config)
asyncio.run(pydtn_rest.start(config))
......
......@@ -12,4 +12,5 @@ async def recv_callback(adu, adu_len, app_eid, src_eid):
def start(config):
"""Start example application that prints received bundle payloads"""
bp_agent.register(config["app-eid"], recv_callback)
......@@ -20,7 +20,6 @@ from ..util.exceptions import AlreadyRegisteredError
from ..util.hexdump import hexdump
from . import convergence_layers
from .convergence_layers.api import ConnectFailedError
from .conn_manager import ConnectionManager, NoNeighborForEIDError
from . import misc
......@@ -58,9 +57,6 @@ class BPAgent:
Args:
node_config (dict): BPAgent configuration
Returns:
None
"""
self._node_id = node_config["id"]
self._date_time_format = node_config["date-time-format"]
......@@ -99,9 +95,6 @@ class BPAgent:
Args:
adapter (ConvergenceLayerAdapter): Convergence Layer Adapter to register
Returns:
None
"""
self._conn_manager.register_cl_adapter(adapter)
......@@ -111,18 +104,11 @@ class BPAgent:
Args:
neighbor (ConvergenceLayerNeighbor): CLA-specific information about how to
connect to particular neighbor
Returns:
None
"""
self._conn_manager.register_neighbor(neighbor)
async def run_forever(self):
"""Block and process incoming data
Returns:
None
"""
"""Block and process incoming data"""
await self._conn_manager.run_forever()
def register(self, eid, recv_callback=None):
......@@ -164,9 +150,6 @@ class BPAgent:
start_time (float): Start time in seconds since epoch
end_time (float): End time in seconds since epoch
Returns:
None
Raises:
ValueError: If start_time is not less than end_time
"""
......@@ -215,10 +198,7 @@ class BPAgent:
Args:
via_eid (str): EID of neighbor to which send the traffic when destination EID matches
dst_eid (str): EID of destination node
Returns:
None
dst_eid (str,optional): EID of destination node. Default is None which matches all bundles
Raises:
AlreadyRegisteredError: If specified route is already registered
......@@ -242,14 +222,13 @@ class BPAgent:
return via
logger.debug(f'BP Agent didn\'t find route to "{target_eid}"')
return None
async def _send_bundle(self, dst_eid, bundle):
try:
await self._conn_manager.try_to_send(dst_eid, bundle)
except NoNeighborForEIDError:
logger.info(f'BP Agent doesn\'t know how to connect to "{dst_eid}" node')
except ConnectFailedError:
except convergence_layers.api.ConnectFailedError:
logger.info(f'BP Agent failed to connect to "{dst_eid}" node')
async def _create_and_send_bundle_msg(self, source, destination, payload):
......@@ -284,9 +263,6 @@ class BPAgent:
destination (str): Destination EID
payload (bytes, bytearray): Payload data
Returns:
None
Raises:
NoContactError: If there is no scheduled contact to route
to specified destination EID
......@@ -308,9 +284,6 @@ class BPAgent:
Assumes the message will be received by uPCN node without
delay, otherwise start_offset and end_offset can't be
correctly interpreted.
Returns:
None
"""
logger.info("BP Agent received request to send uPCN config message")
......@@ -324,9 +297,6 @@ class BPAgent:
Args:
conn (ConvergenceLayerConnection): Convergence-layer-specific connection
Returns:
None
"""
logger.info("BP Agent is receiving bundle")
last_bundle_segment = False
......@@ -414,8 +384,5 @@ class BPPrivateAPI:
Args:
destination (str): Endpoint ID of destination node(-s)
payload (Union[bytes, bytearray]): Payload data
Returns:
None
"""
await self._bp_agent.create_and_send_bundle_msg(self._app_singleton_eid, destination, payload)
......@@ -39,9 +39,6 @@ class ConnectionManager:
Args:
adapter (ConvergenceLayerAdapter): Convergence Layer Adapter to register
Returns:
None
Raises:
ValueError: If adapter is not instance of ConvergenceLayerAdapter
AlreadyRegisteredError: If adapter is already registered
......@@ -58,9 +55,6 @@ class ConnectionManager:
Args:
neighbor (ConvergenceLayerNeighbor): CLA-specific information about how to
connect to particular neighbor
Returns:
None
"""
for adapter in self._cl_adapter_instances:
if isinstance(adapter, neighbor.cla_class):
......@@ -89,11 +83,7 @@ class ConnectionManager:
) # TODO: Don't automatically close connection after one bundle has been received
async def run_forever(self):
"""Block and process incoming data
Returns:
None
"""
"""Block and process incoming data"""
await asyncio.wait(
[
asyncio.create_task(cl_adapter.run_forever(self._request_handler))
......@@ -102,13 +92,24 @@ class ConnectionManager:
)
async def _connect(self, remote_eid):
"""Try to establish connection to node with specified EID
If some nodes have this information but all attempts to connect to the node fail then ConnectFailedError should be raised. Otherwise new connection object should be returned.
Args:
remote_eid: EID of node to connect to
Returns:
ConvergenceLayerConnection: Established connection
Raises:
NoNeighborForEIDError: If no CLA adapter has any information how to connect to node with specified EID
ConnectFailedError: If every attempt to connect to node with specified EID (using all available means) fails
"""
logger.info(f'ConnectionManager: Connecting to "{remote_eid}" node')
some_conn_failed = False
for adapter in self._cl_adapter_instances:
if any(adapter.get_neighbors_for_eid(remote_eid)):
for nbr in adapter.get_neighbors_for_eid(remote_eid):
neighbor = nbr
break
for neighbor in adapter.get_neighbors_for_eid(remote_eid):
try:
conn = await adapter.connect(neighbor)
except ConnectFailedError:
......@@ -124,7 +125,7 @@ class ConnectionManager:
def _pick_a_connection(self):
if not any(self._connections):
return None
return
return self._connections[0]
async def try_to_send(self, dst_eid, bundle):
......@@ -134,9 +135,6 @@ class ConnectionManager:
dst_eid (str): EID of node to send bundle to
bundle (Union[bytes, bytearray]): Bundle to send
Returns:
None
Raises:
NoNeighborForEIDError: No neighbor with specified EID found
ConnectFailedError: Connection failed for all options to
......
......@@ -134,11 +134,7 @@ class ConvergenceLayerConnection(ABC):
@abstractmethod
async def start(self):
"""Start the communication using CLA-specific protocol
Returns:
None
"""
"""Start the communication using CLA-specific protocol"""
@abstractmethod
async def send(self, data, bundle_start=True, bundle_end=True):
......@@ -148,9 +144,6 @@ class ConvergenceLayerConnection(ABC):
data (Union[bytes, bytearray]): Data to send
bundle_start (bool): Data is prefix of serialized bundle
bundle_end (bool): Data is suffix of serialized bundle
Returns:
None
"""
@abstractmethod
......@@ -166,8 +159,4 @@ class ConvergenceLayerConnection(ABC):
@abstractmethod
def close(self):
"""Close connection
Returns:
None
"""
"""Close connection"""
......@@ -46,9 +46,5 @@ class StreamedConvergenceLayerConnection(ConvergenceLayerConnection):
self._reader.unread(data)
def close(self):
"""Close stream
Returns:
None
"""
"""Close stream"""
self._writer.close()
......@@ -76,7 +76,7 @@ class TCPCL(ConvergenceLayerAdapter):
await conn.start()
await self._upper_layer_handler(conn)
except Exception:
logger.exception("Exception occurred inside _request_handler")
logger.exception("Exception occurred while handling incoming data")
finally:
conn.close() # TODO: Don't automatically close connection after one bundle has been received
......@@ -87,9 +87,6 @@ class TCPCL(ConvergenceLayerAdapter):
upper_layer_handler: Callback to inform upper layer when
new bundle is starting to be received and there is already
some data that can be processed by upper layer
Returns:
None
"""
self._upper_layer_handler = upper_layer_handler
logger.info(f"TCPCL: listening on {self._host}:{self._port}")
......@@ -97,6 +94,18 @@ class TCPCL(ConvergenceLayerAdapter):
logger.info("TCPCL: exited")
class ContactMessageError(Exception):
"""Raised when received contact message has wrong format"""
class InvalidMessageTypeError(Exception):
"""Raised when received message type code is invalid"""
class ProtocolError(Exception):
"""Raised when received data has wrong format"""
class TCPCLConnection(StreamedConvergenceLayerConnection):
"""TCPCL-specific connection"""
......@@ -106,7 +115,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
value, bytes_decoded = sdnv_decode(buffer=data)
except ValueError:
logger.info("Error reading SDNV: Unexpected end of input!")
raise self.ProtocolError
raise ProtocolError
self._unrecv(data[bytes_decoded:])
return value
......@@ -115,20 +124,11 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
# logger.debug(f'TCPCL: Sending contact header:\n {contact_msg}')
await self._send(contact_msg)
class ContactMessageError(Exception):
"""Raised when received contact message has wrong format"""
class InvalidMessageTypeError(Exception):
"""Raised when received message type code is invalid"""
class ProtocolError(Exception):
"""Raised when received data has wrong format"""
async def _recv_contact_msg(self):
data = await self._recv_exact(len(TCPCLObjDef.MAGIC))
if data != TCPCLObjDef.MAGIC:
logger.info("Got wrong MAGIC in contact message!")
raise self.ContactMessageError
raise ContactMessageError
# Skip directly to Node ID
await self._recv_exact(4)
......@@ -139,7 +139,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
len_of_payload, bytes_decoded = sdnv_decode(buffer=data)
except ValueError:
logger.info("Unexpected end of contact message!")
raise self.ContactMessageError
raise ContactMessageError
self._unrecv(data[bytes_decoded:])
data = await self._recv_exact(len_of_payload)
......@@ -155,9 +155,6 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
Conforms to::
https://tools.ietf.org/html/rfc7242#section-5
Returns:
None
"""
data = bytearray()
data.append(msg_type << 4 | msg_flags)
......@@ -169,9 +166,6 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
Conforms to::
https://tools.ietf.org/html/rfc7242#section-5
Returns:
None
"""
header = await self._recv(1)
msg_type = MessageType(header[0] >> 4)
......@@ -204,18 +198,15 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
elif msg_type == MessageType.LENGTH:
content = await self._recv_sdnv() # total_bundle_len
else:
raise self.InvalidMessageTypeError
raise InvalidMessageTypeError
return msg_type, msg_flags, content
async def start(self):
"""Start the communication by exchanging contact messages with the peer
Returns:
None
Raises:
TCPCLConnection.ContactMessageError: When received contact message has wrong format
ContactMessageError: When received contact message has wrong format
"""
await self._send_contact_msg()
await self._recv_contact_msg()
......@@ -231,9 +222,6 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
data (Union[bytes, bytearray]): Data to send
bundle_start (bool): Data is prefix of serialized bundle
bundle_end (bool): Data is suffix of serialized bundle
Returns:
None
"""
content_bytes = bytearray()
content_bytes += sdnv_encode(len(data))
......@@ -253,8 +241,8 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
Data is suffix of serialized bundle)
Raises:
TCPCLConnection.InvalidMessageTypeError: When received message type code is invalid
TCPCLConnection.ProtocolError: When received data has wrong format
InvalidMessageTypeError: When received message type code is invalid
ProtocolError: When received data has wrong format
"""
while True:
msg_type, msg_flags, content = await self._recv_segment()
......@@ -267,10 +255,6 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
return content, first_bundle_segment, last_bundle_segment
def close(self):
"""Close connection
Returns:
None
"""
"""Close connection"""
logger.info(f"TCPCL: Closing connection")
super().close()
......@@ -32,9 +32,6 @@ async def start(config=None):
Args:
config: Configuration of Bundle protocol agent
Returns:
None
"""
config = load_config(config)
......
......@@ -35,8 +35,5 @@ class AIOScheduler:
Args:
date_time (float): Number of second since epoch
coro (coroutine): Job to schedule
Returns:
None
"""
self._call_at(date_time, coro)
"""Commonly used exceptions"""
class Error(Exception):
"""Generic error."""
class AlreadyRegisteredError(Error):
class AlreadyRegisteredError(Exception):
"""Error returned by registration functions when attempting to register already registered object."""
class ParsingError(Error):
class ParsingError(Exception):
"""Error that occurred during parsing."""
......@@ -41,9 +41,6 @@ class WritebackReader:
Args:
data: Data to put back to read buffer
Returns:
None
"""
self._read_buffer = data + self._read_buffer
......
......@@ -3,8 +3,6 @@
import struct
import binascii
from .exceptions import Error
def bit_is_set(number, bit):
return bool(number & (1 << bit))
......@@ -22,7 +20,7 @@ class IntField:
elif size == 16:
fmt = "!H"
else:
raise Error("Unsupported size: %s" % size)
raise ValueError("Unsupported size: %s" % size)
self._value, = struct.unpack(fmt, value.encode())
else:
......
......@@ -108,9 +108,6 @@ async def start(config=None):
Args:
config: Configuration of Bundle protocol agent and REST endpoint
Returns:
None
"""
await asyncio.gather(asyncio.create_task(pydtn.start(config)), asyncio.create_task(_start(config)))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment