Commit 12bda5ae authored by Juraj Sloboda's avatar Juraj Sloboda Committed by Matej Feder

Add docstrings

parent f0a65e81
"""Bundle Protocol Version 7
Conforms to::
https://tools.ietf.org/html/draft-ietf-dtn-bpbis-13
Note:
Services of the Bundle Protocol Agent should be accessed using
bp_agent singleton defined here (or objects returned by it).
"""
from .bp import BPAgent
bp_agent = BPAgent()
This diff is collapsed.
"""Connection Manager
This is a layer between BP Agent and CLAs that manages connections
using available CLAs.
Bundle Protocol layer doesn't know about CLAs
(except for registration interface).
It it responsibility of this layer to decide which underlying CLA
layers to use in specific situations.
"""
import asyncio
import logging
......@@ -8,11 +18,13 @@ from ..util.exceptions import AlreadyRegisteredError
logger = logging.getLogger(__name__)
class NoEndpointsForEIDError(Exception):
pass
class NoNeighborForEIDError(Exception):
"""Raised when there is no neighbor with specified EID"""
class ConnectionManager:
"""Connection Manager"""
def __init__(self, request_handler):
self._cl_adapter_instances = []
self._cl_adapter_classes = []
......@@ -20,16 +32,39 @@ class ConnectionManager:
self._request_handler = request_handler
def register_cl_adapter(self, adapter):
"""Register Convergence Layer Adapter for utilization by Connection Manager
Adapter should be and instance initialized with suitable config
Args:
adapter (ConvergenceLayerAdapter): Convergence Layer Adapter to register
Returns:
None
Raises:
ValueError: If adapter is not instance of ConvergenceLayerAdapter
AlreadyRegisteredError: If adapter is already registered
"""
if not isinstance(adapter, ConvergenceLayerAdapter):
raise ValueError
if adapter in self._cl_adapter_instances:
raise AlreadyRegisteredError
self._cl_adapter_instances.append(adapter)
def register_remote_endpoint(self, remote_endpoint):
def register_neighbor(self, neighbor):
"""Register connection option for particular neighbor
Args:
neighbor (ConvergenceLayerNeighbor): CLA-specific information about how to
connect to particular neighbor
Returns:
None
"""
for adapter in self._cl_adapter_instances:
if isinstance(adapter, remote_endpoint.cla_class):
adapter.register_remote_endpoint(remote_endpoint)
if isinstance(adapter, neighbor.cla_class):
adapter.register_neighbor(neighbor)
def _add_connection(self, conn):
if conn in self._connections:
......@@ -44,7 +79,7 @@ class ConnectionManager:
logger.debug('ConnectionManager: Removing connection from the list of existing connections')
self._connections.remove(conn)
async def request_handler(self, conn):
async def _request_handler(self, conn):
self._add_connection(conn)
try:
await self._request_handler(conn)
......@@ -52,19 +87,24 @@ class ConnectionManager:
self._remove_connection(conn) # TODO: Don't automatically close connection after one bundle has been received
async def run_forever(self):
await asyncio.wait([asyncio.create_task(cl_adapter.run_forever(self.request_handler))
"""Block and process incoming data
Returns:
None
"""
await asyncio.wait([asyncio.create_task(cl_adapter.run_forever(self._request_handler))
for cl_adapter in self._cl_adapter_instances])
async def _connect(self, remote_eid):
logger.info(f'ConnectionManager: Connecting to "{remote_eid}" node')
some_conn_failed = False
for adapter in self._cl_adapter_instances:
if any(adapter.get_remote_endpoints_for_eid(remote_eid)):
for ep in adapter.get_remote_endpoints_for_eid(remote_eid):
remote_endpoint = ep
if any(adapter.get_neighbors_for_eid(remote_eid)):
for nbr in adapter.get_neighbors_for_eid(remote_eid):
neighbor = nbr
break
try:
conn = await adapter.connect(remote_endpoint)
conn = await adapter.connect(neighbor)
except ConnectFailedError:
some_conn_failed = True
logger.info(f'ConnectionManager: Failed to connect to "{remote_eid}" node')
......@@ -74,7 +114,7 @@ class ConnectionManager:
if some_conn_failed:
raise ConnectFailedError
else:
raise NoEndpointsForEIDError
raise NoNeighborForEIDError
def _pick_a_connection(self):
if not any(self._connections):
......@@ -82,6 +122,20 @@ class ConnectionManager:
return self._connections[0]
async def try_to_send(self, dst_eid, bundle):
"""Send bundle to node specified by EID
Args:
dst_eid (str): EID of node to send bundle to
bundle (Union[bytes, bytearray]): Bundle to send
Returns:
None
Raises:
NoNeighborForEIDError: No neighbor with specified EID found
ConnectFailedError: Connection failed for all options to
connect to neighbor with specified EID
"""
logger.debug(f'ConnectionManager: Trying to send bundle to "{dst_eid}"')
conn = self._pick_a_connection()
if conn is None:
......
"""Package containing Convergence Layer Adapters
Conforms to::
https://tools.ietf.org/html/draft-ietf-dtn-bpbis-13#section-7.1
Note:
Each adapter class should be imported below, so name of the class
can be used in PyDTN configuration.
"""
from .tcp_cl import TCPCL
"""Interface to Convergence Layer Adapters
Convergence layer adapters should be accessed from bundle protocol
layer using interfaces (base classes) defined in this file
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from ...util.exceptions import AlreadyRegisteredError
# TODO: Choose different name to avoid confusion with 'Bundle endpoint' which is a different concept
@dataclass
class RemoteEndpoint(ABC):
"""Class representing a protocol specific connection endpoint (e.g. IP address and port)
Not to be confused with 'Bundle endpoint' which is a different concept
"""
class ConvergenceLayerNeighbor(ABC):
"""Class representing a protocol specific connection endpoint (e.g. IP address and port)"""
eid: str
class ConnectFailedError(Exception):
pass
"""Raised when connection fails"""
class ConvergenceLayerAdapter(ABC):
"""Abstract base class for Convergence Layer Adapters (CLAs)"""
def __init_subclass__(cls, **kwargs):
endpoint_class = cls.endpoint_class
"""Automatize class pairing
Called when new subclass is defined.
Each ConvergenceLayerAdapter subclass should define a pointer
to corresponding ConvergenceLayerNeighbor subclass.
Make the latter automatically point back to the former.
"""
neighbor_class = cls.neighbor_class
try:
endpoint_class.cla_class
neighbor_class.cla_class
except AttributeError:
endpoint_class.cla_class = cls
neighbor_class.cla_class = cls
else:
raise ValueError(f'{endpoint_class} already has attribute cla_class')
raise ValueError(f'{neighbor_class} already has attribute cla_class')
def __init__(self, endpoint):
self.node_id = endpoint['node-id']
self._endpoints = []
def __init__(self, cla_config):
"""Initialize CLA from supplied configuration
Args:
cla_config (dict): CLA configuration
"""
self.node_id = cla_config['node-id']
self._neighbors = []
@abstractmethod
async def connect(self, endpoint):
pass
async def connect(self, neighbor):
"""Connect to neighbor
Args:
neighbor: Neighbor to connect to
Returns:
ConvergenceLayerConnection: Object representing the established connection
"""
@abstractmethod
async def run_forever(self, upper_layer_handler):
pass
"""Block and process incoming data
Args:
upper_layer_handler: Callback to inform upper layer when
new bundle is starting to be received and there is already
some data that can be processed by upper layer
Returns:
None
"""
def register_neighbor(self, neighbor):
"""Register connection option for particular neighbor
def register_remote_endpoint(self, endpoint: RemoteEndpoint):
if endpoint in self._endpoints:
Args:
neighbor (ConvergenceLayerNeighbor): CLA-specific information about how to
connect to particular neighbor
Returns:
None
Raises:
AlreadyRegisteredError: If neighbor is already registered
"""
if neighbor in self._neighbors:
raise AlreadyRegisteredError
self._endpoints.append(endpoint)
self._neighbors.append(neighbor)
def get_neighbors_for_eid(self, eid):
"""Generator of neighbors matching specified EID
def get_remote_endpoints_for_eid(self, eid):
for ep in self._endpoints:
if ep.eid == eid:
yield ep
Args:
eid (str): EID that neighboring node should be registered to
Yields:
ConvergenceLayerNeighbor: Next neighbor found with specified EID
"""
for nbr in self._neighbors:
if nbr.eid == eid:
yield nbr
class ConvergenceLayerConnection(ABC):
"""CLA-specific connection"""
def __init__(self, cla_adapter):
"""Initialize CLA-specific connection"""
self._cla_adapter = cla_adapter
self._their_eid = None
@property
def our_eid(self):
"""Get EID of our side of the connection
Returns:
str: Our EID
"""
return self._cla_adapter.node_id
@property
def their_eid(self):
"""Get EID of connected peer
Returns:
str: Their EID
"""
return self._their_eid
@abstractmethod
async def start(self):
pass
"""Start the communication using CLA-specific protocol
Returns:
None
"""
@abstractmethod
async def send(self, data, bundle_start=True, bundle_end=True):
pass
"""Send data using connection
Args:
data (Union[bytes, bytearray]): Data to send
bundle_start (bool): Data is prefix of serialized bundle
bundle_end (bool): Data is suffix of serialized bundle
Returns:
None
"""
@abstractmethod
async def recv(self):
pass
"""Receive data from connection
Returns:
Tuple[Union[bytes, bytearray], bool, bool]:
(Received data,
Data is prefix of serialized bundle,
Data is suffix of serialized bundle)
"""
@abstractmethod
def close(self):
pass
"""Close connection
Returns:
None
"""
"""Base class for CLAs that use streams"""
import logging
from .api import ConvergenceLayerConnection
......@@ -9,7 +11,19 @@ logger = logging.getLogger(__name__)
class StreamedConvergenceLayerConnection(ConvergenceLayerConnection):
"""Base class for CLAs that use streams
Contains boilerplate and convenient functions for stream manipulation
"""
def __init__(self, cla_adapter, reader, writer):
"""Initialize CLA-specific connection
Args:
cla_adapter (dict): CLA configuration
reader (asyncio.StreamReader): Reader object of underlying connection
writer (asyncio.StreamWriter): Writer object of underlying connection
"""
super().__init__(cla_adapter)
self._their_eid = None
self._reader = WritebackReader(reader)
......@@ -31,4 +45,9 @@ class StreamedConvergenceLayerConnection(ConvergenceLayerConnection):
self._reader.unread(data)
def close(self):
"""Close stream
Returns:
None
"""
self._writer.close()
# TCPCL (https://tools.ietf.org/html/rfc7242)
"""TCPCL
Conforms to::
https://tools.ietf.org/html/rfc7242
"""
import logging
from dataclasses import dataclass
from .api import ConvergenceLayerAdapter, RemoteEndpoint, ConnectFailedError
from .api import ConvergenceLayerAdapter, ConvergenceLayerNeighbor, ConnectFailedError
from .streamed_cl_base import StreamedConvergenceLayerConnection
from ...encoding.sdnv import sdnv_decode, sdnv_encode
from ...encoding.tcpcl import ObjDef as TCPCLObjDef, MessageType, ContactMessage
......@@ -14,25 +18,44 @@ logger = logging.getLogger(__name__)
@dataclass
class RemoteTCPEndpoint(RemoteEndpoint):
class TCPCLNeighbor(ConvergenceLayerNeighbor):
"""TCP connection endpoint specified by IP address and port"""
host: str
port: int
class TCPCL(ConvergenceLayerAdapter):
endpoint_class = RemoteTCPEndpoint
"""TCPCL Adapter"""
neighbor_class = TCPCLNeighbor
def __init__(self, endpoint):
super().__init__(endpoint)
self._host = endpoint['host']
self._port = endpoint['port']
def __init__(self, cla_config):
"""Initialize TCPCL from supplied configuration
Args:
cla_config (dict): TCPCL configuration
"""
super().__init__(cla_config)
self._host = cla_config['host']
self._port = cla_config['port']
self._upper_layer_handler = None
async def connect(self, endpoint):
assert isinstance(endpoint, RemoteTCPEndpoint)
logger.info(f'TCPCL: Connecting to endpoint: {endpoint.host}:{endpoint.port}')
async def connect(self, neighbor):
"""Establish connection to specified neighbor
Args:
neighbor (TCPCLNeighbor): Neighbor to connect to
Returns:
TCPCLConnection: Handle to established connection
Raises:
ConnectFailedError: When connection could not be established
"""
assert isinstance(neighbor, TCPCLNeighbor)
logger.info(f'TCPCL: Connecting to: {neighbor.host}:{neighbor.port}')
try:
reader, writer = await uio.open_connection(endpoint.host, endpoint.port)
reader, writer = await uio.open_connection(neighbor.host, neighbor.port)
except ConnectionRefusedError or TimeoutError:
raise ConnectFailedError
conn = TCPCLConnection(self, reader, writer)
......@@ -57,6 +80,16 @@ class TCPCL(ConvergenceLayerAdapter):
conn.close() # TODO: Don't automatically close connection after one bundle has been received
async def run_forever(self, upper_layer_handler):
"""Block and process incoming data
Args:
upper_layer_handler: Callback to inform upper layer when
new bundle is starting to be received and there is already
some data that can be processed by upper layer
Returns:
None
"""
self._upper_layer_handler = upper_layer_handler
logger.info(f'TCPCL: listening on {self._host}:{self._port}')
await uio.serve_forever(self._request_handler, self._host, self._port)
......@@ -64,6 +97,8 @@ class TCPCL(ConvergenceLayerAdapter):
class TCPCLConnection(StreamedConvergenceLayerConnection):
"""TCPCL-specific connection"""
async def _recv_sdnv(self):
data = await self._recv(16)
try:
......@@ -79,20 +114,20 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
# logger.debug(f'TCPCL: Sending contact header:\n {contact_msg}')
await self._send(contact_msg)
class ContactMessageReadError(Exception):
pass
class ContactMessageError(Exception):
"""Raised when received contact message has wrong format"""
class InvalidMessageTypeError(Exception):
pass
"""Raised when received message type code is invalid"""
class ProtocolError(Exception):
pass
"""Raised when received data has wrong format"""
async def _recv_contact_msg(self):
data = await self._recv_exact(len(TCPCLObjDef.MAGIC))
if data != TCPCLObjDef.MAGIC:
logger.info('Got wrong MAGIC in contact message!')
raise self.ContactMessageReadError
raise self.ContactMessageError
# Skip directly to Node ID
await self._recv_exact(4)
......@@ -103,7 +138,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
len_of_payload, bytes_decoded = sdnv_decode(buffer=data)
except ValueError:
logger.info('Unexpected end of contact message!')
raise self.ContactMessageReadError
raise self.ContactMessageError
self._unrecv(data[bytes_decoded:])
data = await self._recv_exact(len_of_payload)
......@@ -112,14 +147,17 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
except UnicodeDecodeError:
peer_eid = data
logger.debug(f'Received peer EID: {peer_eid}')
# TODO: If connection created by us, compare to EID of registered target endpoint
# TODO: If connection created by us, compare to EID of registered neighbor
async def _send_segment(self, msg_type, msg_flags, content_bytes):
"""Send segment
https://tools.ietf.org/html/rfc7242#section-5
"""
Conforms to::
https://tools.ietf.org/html/rfc7242#section-5
Returns:
None
"""
data = bytearray()
data.append(msg_type << 4 | msg_flags)
data += content_bytes
......@@ -128,9 +166,12 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
async def _recv_segment(self):
"""Receive segment
https://tools.ietf.org/html/rfc7242#section-5
"""
Conforms to::
https://tools.ietf.org/html/rfc7242#section-5
Returns:
None
"""
header = await self._recv(1)
msg_type = MessageType(header[0] >> 4)
msg_flags = header[0] & 0xf
......@@ -167,6 +208,14 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
return msg_type, msg_flags, content
async def start(self):
"""Start the communication by exchanging contact messages with the peer
Returns:
None
Raises:
TCPCLConnection.ContactMessageError: When received contact message has wrong format
"""
await self._send_contact_msg()
await self._recv_contact_msg()
logger.info('TCPCL: Contact messages exchanged, connection ready to be used')
......@@ -174,9 +223,17 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
async def send(self, data, bundle_start=True, bundle_end=True):
"""Send data as one bundle data segment
https://tools.ietf.org/html/rfc7242#section-5.2
"""
Conforms to::
https://tools.ietf.org/html/rfc7242#section-5.2
Args:
data (Union[bytes, bytearray]): Data to send
bundle_start (bool): Data is prefix of serialized bundle
bundle_end (bool): Data is suffix of serialized bundle
Returns:
None
"""
content_bytes = bytearray()
content_bytes += sdnv_encode(len(data))
content_bytes += data
......@@ -185,10 +242,19 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
async def recv(self):
"""Receive one bundle data segment
https://tools.ietf.org/html/rfc7242#section-5.2
Return buffer containing payload of received segment
"""
Conforms to::
https://tools.ietf.org/html/rfc7242#section-5.2
Returns:
Tuple[bytearray, bool, bool]:
(Payload of received segment,
Data is prefix of serialized bundle,
Data is suffix of serialized bundle)
Raises:
TCPCLConnection.InvalidMessageTypeError: When received message type code is invalid
TCPCLConnection.ProtocolError: When received data has wrong format
"""
while True:
msg_type, msg_flags, content = await self._recv_segment()
if msg_type == MessageType.DATA_SEGMENT:
......@@ -200,5 +266,10 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
return content, first_bundle_segment, last_bundle_segment
def close(self):
"""Close connection
Returns:
None
"""
logger.info(f'TCPCL: Closing connection')
super().close()
"""Miscellaneous functions and definitions"""
import datetime
import time
......
......@@ -2,6 +2,10 @@
A simple Python package for generating and serialize BPbis bundles into CBOR.
Conforms to::
https://tools.ietf.org/html/draft-ietf-dtn-bpbis-10
(which is compatible with uPCN 0.6.0)
Dependencies:
This module depends on the ``cbor`` package which can be installed via pip:
......
......@@ -111,6 +111,7 @@ class CRC(object):
# string
assert crc(b"Hello world!") == 0x1b851995
"""
def __init__(self, width, polynomial, initial_value, final_xor_value,
input_reflected, result_reflected):
self.width = width
......
"""IPND objects with encoding/decoding"""
import struct
from .sdnv import sdnv_decode, sdnv_encode
......
"""SDNV encoding/decoding"""
# Author: Marco 'don' Kaulea
# Home Page: https://github.com/Don42/sdnv
# License: Apache 2.0
......
"""TCPCL (https://tools.ietf.org/html/rfc7242) definitions"""
import logging
import struct
from enum import IntEnum
......
"""Main module"""
import asyncio
import logging.config
......@@ -26,6 +28,14 @@ def setup_logging(log_config):
async def start(config=None):
"""Run Bundle protocol agent
Args:
config: Configuration of Bundle protocol agent
Returns:
None
"""
config = load_config(config)
setup_logging(config['log'])
......
"""Bundle 7 tests"""
from datetime import timedelta
from ..encoding.bundle7 import Bundle, BundleAgeBlock, BundleProcFlag, BundleStatusReport, CreationTimestamp, HopCountBlock,\
......
"""Miscellaneous tests"""
from ..bundle_protocol.misc import unix2dtn, dtn2unix, serialize_bundle, NULL_EID, serialize_upcn_config_message
from ..bundle_protocol.convergence_layers.tcp_cl import TCPCLConnection
from ..encoding.sdnv import sdnv_encode, sdnv_decode
......
"""Job scheduler using asyncio"""
import asyncio
import logging
import time
......@@ -7,6 +9,7 @@ logger = logging.getLogger(__name__)
class AIOScheduler:
"""Job scheduler using asyncio"""