Commit 14340dc2 authored by Juraj Sloboda's avatar Juraj Sloboda Committed by Matej Feder

Code review and improvements

parent 72b4684e
import argparse
import asyncio
import json
import logging.config
import yaml
import pydtn_rest
import app
logging.basicConfig(format='%(asctime)s - [%(levelname)-5s] - %(message)s', level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument("--yaml", type=str, default='')
parser.add_argument("--json", type=str, default='')
def load_config():
args = parser.parse_args()
# forbid using both formats at once
assert args.yaml == '' or args.json == ''
if args.yaml != '':
yaml_config = yaml.safe_load(args.yaml)
print(f'command line --yaml has value:\n{args.yaml}')
return yaml_config
if args.json != '':
json_config = json.loads(args.json)
print(f'command line --json has value:\n{args.json}')
return json_config
return {}
def start():
config = load_config()
# Start example application that prints received bundle payloads
app.start(config)
asyncio.run(pydtn_rest.start(config))
start()
import logging
from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp import EndOfData
async def recv_callback(ctx, app_eid, src_eid):
logging.info(f'BP client "{app_eid}" receiving ADU from "{src_eid}"')
while True:
try:
data = await bp_agent.receive(ctx)
except EndOfData:
break
print(data.decode('ascii'), end='', flush=True)
print()
def start(config):
bp_agent.register(config['app-eid'], recv_callback)
import argparse
import logging
from pydtn.bundle_protocol.convergence_layers.tcp_cl import RemoteTCPEndpoint
from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp import EndOfData
from pydtn import start as bp_start
from pydtn_rest import start as rest_start
logging.basicConfig(format='%(asctime)s - [%(name)s] - [%(levelname)-5s] - %(message)s', level=logging.DEBUG)
our_eid = None
bp_client = None
async def recv_callback(ctx, src_eid):
global our_eid
logging.info(f'BP client "{our_eid}" receiving ADU from "{src_eid}"')
while True:
try:
data = await bp_agent.receive(ctx)
except EndOfData:
break
print(data.decode('ascii'), end='', flush=True)
print()
parser = argparse.ArgumentParser()
parser.add_argument("--node", type=str)
parser.add_argument("--port", type=int, default=42420)
parser.add_argument("--http-port", type=int, default=8080)
parser.add_argument("--upcn-eid", type=str, default='dtn://ops-sat.dtn')
parser.add_argument("--upcn-host", type=str, default="127.0.0.1")
parser.add_argument("--upcn-port", type=int, default=4556)
if __name__ == '__main__':
args = parser.parse_args()
# Start Bundle protocol server
bp_start(args.node, args.port)
# Register uPCN node as neighbor accessible through TCP
bp_agent.register_remote_endpoint(RemoteTCPEndpoint(args.upcn_eid, args.upcn_host, args.upcn_port))
# Send all traffic through uPCN node
bp_agent.add_route(None, args.upcn_eid)
# Start REST endpoint
rest_start(args.http_port)
# Start example application that prints received bundle payloads
our_eid = args.node
bp_app = bp_agent.register(our_eid, recv_callback)
......@@ -9,16 +9,30 @@ services:
ports:
- "8081:8081"
tty: true
restart: always
volumes:
- ..:/src
- ..:/src:ro
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
command: >
/src/docker/pydtn_run.sh
--node dtn://pyDTN-1.dtn
--port 2001
--http-port 8081
--upcn-host 172.25.1.21
--upcn-port 4556
/src/docker/pydtn_demo_run.sh --yaml "
app-eid: dtn://pyDTN-1.dtn/demo_app
node:
id: dtn://pyDTN-1.dtn
routes:
# Send all traffic through uPCN node
null: dtn://ops-sat.dtn
convergence-layer-adapters:
TCPCL:
host: 0.0.0.0
port: 2001
neighbors:
'dtn://ops-sat.dtn':
host: 172.25.1.21
port: 4556
api:
host: 0.0.0.0
port: 8081
"
networks:
dtnnet1:
ipv4_address: 172.25.1.11
......@@ -29,16 +43,29 @@ services:
ports:
- "8082:8082"
tty: true
restart: always
volumes:
- ..:/src
- ..:/src:ro
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
command: >
/src/docker/pydtn_run.sh
--node dtn://pyDTN-2.dtn
--port 2002
--http-port 8082
--upcn-host 172.25.2.21
--upcn-port 4556
/src/docker/pydtn_demo_run.sh --yaml "
app-eid: dtn://pyDTN-2.dtn/demo_app
node:
id: dtn://pyDTN-2.dtn
routes:
'dtn://ops-sat.dtn/config': dtn://ops-sat.dtn
convergence-layer-adapters:
TCPCL:
host: 0.0.0.0
port: 2002
neighbors:
'dtn://ops-sat.dtn':
host: 172.25.2.21
port: 4556
api:
host: 0.0.0.0
port: 8082
"
networks:
dtnnet2:
ipv4_address: 172.25.2.11
......
#!/usr/bin/env sh
pip install /src/pydtn
pip install /src/pydtn_rest
python /src/demo_app "$@"
# Don't stop the container automatically so the user can attach to it for debugging
sleep infinity
#!/usr/bin/env sh
pip install --editable /src/pydtn
pip install --editable /src/pydtn_rest
python /src/demo_app/main.py "$@"
......@@ -37,7 +37,7 @@ class BPAgent:
async def run_forever(self):
await self._conn_manager.run_forever()
def register(self, eid, recv_callback):
def register(self, eid, recv_callback=None):
if eid in (reg._app_singleton_eid for reg in self._eid_registrations):
raise AlreadyRegisteredError
logging.info(f'BP Agent is registering client EID "{eid}"')
......@@ -51,19 +51,22 @@ class BPAgent:
return client
return None
def add_route(self, target_eid, via_eid):
if (target_eid, via_eid) in self._routes:
def add_route(self, via_eid, dst_eid=None):
if (dst_eid, via_eid) in self._routes:
raise AlreadyRegisteredError
if target_eid is not None:
logging.info(f'BP Agent adding route to "{target_eid}" via "{via_eid}"')
if dst_eid is not None:
logging.info(f'BP Agent adding route to "{dst_eid}" via "{via_eid}"')
else:
logging.info(f'BP Agent adding default route via "{via_eid}"')
self._routes.append((target_eid, via_eid))
self._routes.append((dst_eid, via_eid))
def _get_via_eid(self, target_eid):
logging.debug(f'BP Agent is looking for a route to "{target_eid}"')
for target, via in self._routes:
if target is None or target == target_eid:
logging.debug(f'BP Agent found route to "{target_eid}" via "{via}"')
return via
logging.debug(f'BP Agent didn\'t find route to "{target_eid}"')
return None
async def _send_bundle(self, dst_eid, bundle):
......@@ -77,15 +80,18 @@ class BPAgent:
except ConnectFailedError:
logging.info(f'BP Agent failed to connect to "{dst_eid}" node')
async def _send_as_bundle(self, source, destination, payload):
async def _create_and_send_bundle_msg(self, source, destination, payload):
data = misc.serialize_bundle(source, destination, payload)
via_eid = self._get_via_eid(destination)
if via_eid is None:
logging.info(f'BP Agent cannot find route to "{destination}"')
return
via_eid = destination
await self._send_bundle(via_eid, data)
async def send_config_msg(self, target_eid, eid, cla_address, start_offset, end_offset):
async def create_and_send_bundle_msg(self, source, destination, payload):
logging.info(f'BP Agent received request to send bundle (src="{source}", dst="{destination}")')
await self._create_and_send_bundle_msg(source, destination, payload)
async def create_and_send_config_msg(self, target_eid, eid, cla_address, start_offset, end_offset):
logging.info('BP Agent received request to send uPCN config message')
inner_msg = misc.serialize_upcn_config_message(
eid,
......@@ -94,7 +100,7 @@ class BPAgent:
misc.make_contact(start_offset, end_offset, 500),
],
)
await self._send_as_bundle(get_bp_node_id(), target_eid, inner_msg)
await self._create_and_send_bundle_msg(get_bp_node_id(), target_eid, inner_msg)
@staticmethod
async def receive(context):
......@@ -134,7 +140,7 @@ class BPAgent:
client = self._get_client_by_eid(client_eid)
if client is None:
raise ClientNotRegistered(client_eid)
return self._get_client_by_eid(client_eid), src_eid
return self._get_client_by_eid(client_eid), client_eid, src_eid
@staticmethod
def _translate_to_upper_layer(data):
......@@ -156,7 +162,7 @@ class BPAgent:
logging.debug(f'BP Agent received data of length {len(data)}')
added_data += data
try:
bp_client, src_eid = self._determine_bp_client_and_src_eid(added_data)
bp_client, app_eid, src_eid = self._determine_bp_client_and_src_eid(added_data)
except BPAgent._CannotDetermineBpClientAndSrcEIDYet:
pass
else:
......@@ -171,7 +177,7 @@ class BPAgent:
else:
await bp_client._recv_callback(
self.PayloadReader(translated_data, last_bundle_segment, conn, total_len),
src_eid)
app_eid, src_eid)
return
data, first_bundle_segment, last_bundle_segment = await conn.recv()
if last_bundle_segment:
......@@ -210,6 +216,5 @@ class BPPrivateAPI:
self._app_singleton_eid = eid
self._recv_callback = recv_callback
async def send_bundle_msg(self, destination, payload):
logging.info('BP Agent received request to send bundle')
await self._bp_agent._send_as_bundle(self._app_singleton_eid, destination, payload)
async def create_and_send_bundle_msg(self, destination, payload):
await self._bp_agent.create_and_send_bundle_msg(self._app_singleton_eid, destination, payload)
import asyncio
from typing import Type
import logging
from .convergence_layers.api import ConvergenceLayerAdapter, ConnectFailedError
......@@ -21,30 +20,13 @@ class ConnectionManager:
self._connections = []
self._request_handler = request_handler
def _register_cl_adapter_instance(self, adapter: ConvergenceLayerAdapter):
def register_cl_adapter(self, adapter):
if not isinstance(adapter, ConvergenceLayerAdapter):
raise ValueError
if adapter in self._cl_adapter_instances:
raise AlreadyRegisteredError()
raise AlreadyRegisteredError
self._cl_adapter_instances.append(adapter)
def _register_cl_adapter_class(self, adapter_class: Type[ConvergenceLayerAdapter]):
"""Convenient function"""
if adapter_class in self._cl_adapter_classes:
raise AlreadyRegisteredError()
self._cl_adapter_classes.append(adapter_class)
self._register_cl_adapter_instance(adapter_class())
def register_cl_adapter(self, adapter):
if isinstance(adapter, type):
if issubclass(adapter, ConvergenceLayerAdapter):
self._register_cl_adapter_class(adapter)
else:
raise TypeError
else:
if isinstance(adapter, ConvergenceLayerAdapter):
self._register_cl_adapter_instance(adapter)
else:
raise ValueError
def register_remote_endpoint(self, remote_endpoint):
for adapter in self._cl_adapter_instances:
if isinstance(adapter, remote_endpoint.cla_class):
......@@ -65,8 +47,10 @@ class ConnectionManager:
async def request_handler(self, conn):
self._add_connection(conn)
await self._request_handler(conn)
self._remove_connection(conn) # TODO: Don't automatically close connection after one bundle has been received
try:
await self._request_handler(conn)
finally:
self._remove_connection(conn) # TODO: Don't automatically close connection after one bundle has been received
async def run_forever(self):
await asyncio.wait([asyncio.create_task(cl_adapter.run_forever(self.request_handler))
......
import logging
from abc import ABC, abstractmethod
from ..bp_node import get_bp_node_id
from ...util.configure import Configurable
from ...util.exceptions import AlreadyRegisteredError
......@@ -11,18 +11,25 @@ class RemoteEndpoint(ABC):
Not to be confused with 'Bundle endpoint' which is a different concept
"""
def __init__(self, eid):
self.eid = eid
def __init__(self, endpoint):
self.eid = endpoint['eid']
class ConnectFailedError(Exception):
pass
class ConvergenceLayerAdapter(Configurable):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class ConvergenceLayerAdapter(ABC):
def __init_subclass__(cls, **kwargs):
endpoint_class = cls.endpoint_class
try:
endpoint_class.cla_class
except AttributeError:
endpoint_class.cla_class = cls
else:
raise ValueError(f'{endpoint_class} already has attribute cla_class')
def __init__(self):
self._endpoints = []
@abstractmethod
......@@ -45,8 +52,6 @@ class ConvergenceLayerAdapter(Configurable):
class ConvergenceLayerConnection(ABC):
cl_type = ConvergenceLayerAdapter
def __init__(self):
self._their_eid = None
......
......@@ -9,26 +9,25 @@ from ...encoding.tcpcl import ObjDef as TCPCLObjDef, MessageType, ContactMessage
from ...util import uio
class TCPCLAdapter(ConvergenceLayerAdapter):
DEFAULT_CONFIG = {
'ip': '0.0.0.0',
'port': 4556,
}
class RemoteTCPEndpoint(RemoteEndpoint):
def __init__(self, endpoint):
super().__init__(endpoint)
self.host = endpoint['host']
self.port = endpoint['port']
def configure(self, *args, **kwargs):
super().configure(*args, **kwargs)
# convenience
self._ip = self.config['ip']
self._port = self.config['port']
class TCPCL(ConvergenceLayerAdapter):
endpoint_class = RemoteTCPEndpoint
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, endpoint):
super().__init__()
self._host = endpoint['host']
self._port = endpoint['port']
self._upper_layer_handler = None
async def connect(self, endpoint):
assert isinstance(endpoint, RemoteTCPEndpoint)
logging.info(f'TCPCLAdapter: Connecting to endpoint: {endpoint.host}:{endpoint.port}')
logging.info(f'TCPCL: Connecting to endpoint: {endpoint.host}:{endpoint.port}')
try:
reader, writer = await uio.open_connection(endpoint.host, endpoint.port)
except ConnectionRefusedError or TimeoutError:
......@@ -44,25 +43,24 @@ class TCPCLAdapter(ConvergenceLayerAdapter):
async def _request_handler(self, reader, writer):
# TODO: Don't close the connection, but repeatedly call self._upper_layer_handler for every new bundle prefix
logging.info(f'TCPCLAdapter: Accepted connection')
logging.info(f'TCPCL: Accepted connection')
conn = TCPCLConnection(reader, writer)
try:
await conn.start()
await self._upper_layer_handler(conn)
except Exception:
logging.exception('Exception occurred inside _request_handler')
finally:
conn.close()
conn.close() # TODO: Don't automatically close connection after one bundle has been received
async def run_forever(self, upper_layer_handler):
self._upper_layer_handler = upper_layer_handler
logging.info(f'TCPCLAdapter: Server started: {self._ip}:{self._port}')
await uio.serve_forever(self._request_handler, self._ip, self._port)
logging.info(f'TCPCL: listening on {self._host}:{self._port}')
await uio.serve_forever(self._request_handler, self._host, self._port)
logging.info('TCPCL: exited')
class TCPCLConnection(StreamedConvergenceLayerConnection):
@property
def cla_class(self):
return TCPCLAdapter
def __init__(self, reader, writer):
super().__init__(reader, writer)
......@@ -78,7 +76,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
async def _send_contact_msg(self):
contact_msg = ContactMessage(self.our_eid).serialize()
# logging.debug(f'TCPCLAdapter: Sending contact header:\n {contact_msg}')
# logging.debug(f'TCPCL: Sending contact header:\n {contact_msg}')
await self._send(contact_msg)
class ContactMessageReadError(Exception):
......@@ -137,7 +135,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
msg_type = MessageType(header[0] >> 4)
msg_flags = header[0] & 0xf
logging.debug(f'TCPCLAdapter: Received segment of type {msg_type.name}')
logging.debug(f'TCPCL: Received segment of type {msg_type.name}')
if msg_type == MessageType.DATA_SEGMENT:
payload_len = await self._recv_sdnv()
......@@ -171,7 +169,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
async def start(self):
await self._send_contact_msg()
await self._recv_contact_msg()
logging.info('TCPCLAdapter: Contact messages exchanged, connection ready to be used')
logging.info('TCPCL: Contact messages exchanged, connection ready to be used')
async def send(self, data, bundle_start=True, bundle_end=True):
"""Send data as one bundle data segment
......@@ -202,16 +200,5 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
return content, first_bundle_segment, last_bundle_segment
def close(self):
logging.info(f'TCPCLAdapter: Closing connection')
logging.info(f'TCPCL: Closing connection')
super().close()
class RemoteTCPEndpoint(RemoteEndpoint):
@property
def cla_class(self):
return TCPCLAdapter
def __init__(self, eid, host, port):
super().__init__(eid)
self.host = host
self.port = port
......@@ -5,25 +5,64 @@ import yaml
from .bundle_protocol import bp_agent
from .bundle_protocol.bp_node import set_bp_node_id
from .bundle_protocol.convergence_layers.tcp_cl import TCPCLAdapter
from .bundle_protocol import convergence_layers
def load_config():
def load_config(config):
filename = pkg_resources.resource_filename('pydtn', 'pydtn.yaml')
with open(filename, 'rt') as file:
config = yaml.safe_load(file.read())
return config
default_config = yaml.safe_load(file.read())
return {**default_config, **config}
async def _start(config=None):
config = load_config(config)
def start(node_id, tcp_port, config=load_config()):
logging.config.dictConfig(config['log'])
logging.debug(f'pydtn config is:\n{yaml.safe_dump(config)}')
node_config = config['node']
node_id = node_config['id']
if node_id.endswith('default.dtn'):
logging.warning('Using default pyDTN config')
set_bp_node_id(node_id)
# Initialize TCP convergence layer adapter
tcp_cla = TCPCLAdapter(port=tcp_port)
bp_agent.register_cl_adapter(tcp_cla)
if 'routes' in node_config:
for key, value in node_config['routes'].items():
bp_agent.add_route(dst_eid=key, via_eid=value)
for cla_name, cla_config in node_config['convergence-layer-adapters'].items():
cla_class = getattr(convergence_layers, cla_name)
cla = cla_class(cla_config)
bp_agent.register_cl_adapter(cla)
if 'neighbors' in cla_config:
for neighbor_eid, neighbor_config in cla_config['neighbors'].items():
# Save the neighbor EID inside the neighbor config
neighbor_config['eid'] = neighbor_eid
neighbor_class = cla_class.endpoint_class
bp_agent.register_remote_endpoint(neighbor_class(neighbor_config))
# Start Bundle protocol agent
await bp_agent.run_forever()
_running = False
async def start(config=None):
global _running
if _running:
return
_running = True
try:
await _start(config)
finally:
_running = False
# Start Bundle protocol server
loop = asyncio.get_event_loop()
loop.run_until_complete(bp_agent.run_forever())
if __name__ == '__main__':
asyncio.run(start())
# === DEFAULT DTN NODE CONFIG ===
node:
id: dtn://default.dtn
convergence-layer-adapters:
TCPCL:
host: 0.0.0.0
port: 4556
# === LOGGING ===
log:
version: 1
formatters:
standard:
format: "%(asctime)s - [%(name)s] - [%(levelname)-5s] - %(message)s"
format: "%(asctime)s - [%(levelname)-5s] - %(message)s"
handlers:
console:
class: logging.StreamHandler
......
from abc import ABC
class WrongConfigKeyError:
pass
def args_to_dict(*args, **kwargs):
dictionary = {}
for dct in args:
dictionary.update(dct)
dictionary.update(kwargs)
return dictionary
def compose_config(args, kwargs, default_config):
user_config = args_to_dict(args, kwargs)
for key in user_config:
if key not in default_config:
raise WrongConfigKeyError
return {**default_config, **user_config}
class Configurable(ABC):
DEFAULT_CONFIG = {}
def __init__(self, *args, **kwargs):
self.configure(*args, **kwargs)
def configure(self, *args, **kwargs):
self.config = compose_config(args, kwargs, self.DEFAULT_CONFIG)
......@@ -15,5 +15,8 @@ setup(
url='https://git.ifne.eu/space-public/pyDTN/',
python_requires='>=3.7.3',
packages=find_packages(),
package_data={
'pydtn': ['pydtn.yaml']
},
install_requires=install_requires,
)
import asyncio
import logging
import logging.config
from aiohttp import web
import pkg_resources
import yaml
from aiohttp import web
import pydtn
from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp_node import get_bp_node_id
def load_config():
def load_config(config):
filename = pkg_resources.resource_filename('pydtn_rest', 'pydtn_rest.yaml')
with open(filename, 'rt') as file:
config = yaml.safe_load(file.read())
return config
default_config = yaml.safe_load(file.read())
our_eid = None
bp_client = None
upcn_tcp_endpoint = None
return {**default_config, **config}
routes = web.RouteTableDef()
......@@ -27,41 +24,80 @@ routes = web.RouteTableDef()
async def handle_config(request):
logging.info('HTTPD: Got request on /config')
json = await request.json()
try:
json = await request.json()
if 'messages' not in json:
raise web.HTTPBadRequest
if 'messages' not in json:
raise web.HTTPBadRequest
for msg in json['messages']:
await bp_agent.send_config_msg('dtn://ops-sat.dtn/config',