Commit 838cf0e2 authored by Juraj Sloboda's avatar Juraj Sloboda Committed by Matej Feder

Configuration of contacts and bundle scheduling

parent 14340dc2
......@@ -29,6 +29,11 @@ services:
'dtn://ops-sat.dtn':
host: 172.25.1.21
port: 4556
contacts:
'dtn://ops-sat.dtn':
- start-time: '2019-08-01 00:00:00'
end-time: '2020-08-01 00:00:00'
date-time-format: '%Y-%m-%d %H:%M:%S'
api:
host: 0.0.0.0
port: 8081
......@@ -62,6 +67,11 @@ services:
'dtn://ops-sat.dtn':
host: 172.25.2.21
port: 4556
contacts:
'dtn://ops-sat.dtn':
- start-time: '2019-08-01 00:00:00'
end-time: '2020-08-01 00:00:00'
date-time-format: '%Y-%m-%d %H:%M:%S'
api:
host: 0.0.0.0
port: 8082
......
import datetime
import logging
import time
from cbor2 import CBORDecoder
from ..encoding.bundle7 import EID
from ..util.aio_scheduler import AIOScheduler
from ..util.exceptions import AlreadyRegisteredError
from ..util.io_util import CountingBytesIO
from ..util.hexdump import hexdump
from . import convergence_layers
from .convergence_layers.api import ConnectFailedError
from .bp_node import get_bp_node_id
from .conn_manager import ConnectionManager, UnknownEID, NoContact
from .conn_manager import ConnectionManager, NoEndpointsForEIDError
from . import misc
......@@ -21,12 +26,46 @@ class ClientNotRegistered(Exception):
self.eid = eid
class NoContactError(Exception):
def __init__(self, eid):
self.eid = eid
class BPAgent:
def __init__(self):
self._eid_registrations = []
self._routes = []
self._bundles_to_send = []
self._contacts = {}
self._conn_manager = ConnectionManager(self._recv_callback)
self._scheduler = AIOScheduler()
self._date_time_format = None
def init(self, node_config):
self._date_time_format = node_config['date-time-format']
if 'routes' in node_config:
for key, value in node_config['routes'].items():
self.add_route(dst_eid=key, via_eid=value)
for cla_name, cla_config in node_config['convergence-layer-adapters'].items():
cla_class = getattr(convergence_layers, cla_name)
cla = cla_class(cla_config)
self.register_cl_adapter(cla)
if 'neighbors' in cla_config:
for neighbor_eid, neighbor_config in cla_config['neighbors'].items():
# Save the neighbor EID inside the neighbor config
neighbor_config['eid'] = neighbor_eid
neighbor_class = cla_class.endpoint_class
self.register_remote_endpoint(neighbor_class(neighbor_config))
date_time_format = node_config['date-time-format']
if 'contacts' in node_config:
for key, value in node_config['contacts'].items():
for contact in value:
start_time = time.mktime(time.strptime(contact['start-time'], date_time_format))
end_time = time.mktime(time.strptime(contact['end-time'], date_time_format))
self.add_contact(key, start_time, end_time)
def register_cl_adapter(self, adapter):
self._conn_manager.register_cl_adapter(adapter)
......@@ -51,6 +90,37 @@ class BPAgent:
return client
return None
def add_contact(self, eid, start_time, end_time):
if start_time >= end_time:
raise ValueError('start_time should be less than end_time')
start_time_str = datetime.datetime.fromtimestamp(start_time).strftime('%c')
end_time_str = datetime.datetime.fromtimestamp(end_time).strftime('%c')
logging.info(f'BP Agent adding contact for node "{eid}" - start: {start_time_str}, end: {end_time_str}')
contact = (start_time, end_time)
if eid not in self._contacts:
self._contacts[eid] = [contact]
else:
self._contacts[eid].append(contact)
def _in_contact(self, eid):
if eid not in self._contacts:
return False
system_time = time.time()
for contact in self._contacts[eid]:
if contact[0] <= system_time <= contact[1]:
return True
def _get_earliest_contact_start_time(self, eid):
system_time = time.time()
if self._in_contact(eid):
return system_time
if eid not in self._contacts:
return None
start_times = [contact[0] for contact in self._contacts[eid] if system_time <= contact[1]]
if not any(start_times):
return None
return min(start_times)
def add_route(self, via_eid, dst_eid=None):
if (dst_eid, via_eid) in self._routes:
raise AlreadyRegisteredError
......@@ -72,11 +142,8 @@ class BPAgent:
async def _send_bundle(self, dst_eid, bundle):
try:
await self._conn_manager.try_to_send(dst_eid, bundle)
except UnknownEID:
logging.info(f'BP Agent doesn\'t know how to contact "{dst_eid}" node')
except NoContact:
logging.info(f'BP Agent is unable to contact "{dst_eid}" node right now, saving bundle')
self._bundles_to_send.append(bundle)
except NoEndpointsForEIDError:
logging.info(f'BP Agent doesn\'t know how to connect to "{dst_eid}" node')
except ConnectFailedError:
logging.info(f'BP Agent failed to connect to "{dst_eid}" node')
......@@ -85,7 +152,16 @@ class BPAgent:
via_eid = self._get_via_eid(destination)
if via_eid is None:
via_eid = destination
await self._send_bundle(via_eid, data)
if self._in_contact(via_eid):
await self._send_bundle(via_eid, data)
else:
start_time = self._get_earliest_contact_start_time(via_eid)
if start_time is not None:
logging.info(f'BP Agent is scheduling job to send bundle to "{via_eid}"')
self._scheduler.schedule_job(start_time, self._send_bundle(via_eid, data))
else:
logging.warning(f'There is no scheduled contact for node "{via_eid}", removing bundle') # TODO: Try to use different route
raise NoContactError(via_eid)
async def create_and_send_bundle_msg(self, source, destination, payload):
logging.info(f'BP Agent received request to send bundle (src="{source}", dst="{destination}")')
......
......@@ -5,11 +5,7 @@ from .convergence_layers.api import ConvergenceLayerAdapter, ConnectFailedError
from ..util.exceptions import AlreadyRegisteredError
class UnknownEID(Exception):
pass
class NoContact(Exception):
class NoEndpointsForEIDError(Exception):
pass
......@@ -75,26 +71,19 @@ class ConnectionManager:
if some_conn_failed:
raise ConnectFailedError
else:
raise UnknownEID
raise NoEndpointsForEIDError
def _pick_a_connection(self):
if not any(self._connections):
return None
return self._connections[0]
def _destination_node_in_reach(self, dst_eid):
return True # TODO
async def try_to_send(self, dst_eid, bundle):
logging.debug(f'ConnectionManager: Trying to send bundle to "{dst_eid}"')
conn = self._pick_a_connection()
if conn is None:
if self._destination_node_in_reach(dst_eid):
logging.debug('ConnectionManager: No connection exists, creating one')
conn = await self._connect(dst_eid)
else:
logging.debug('ConnectionManager: No connection exists and destination node not in reach')
raise NoContact
logging.debug('ConnectionManager: No connection exists, creating one')
conn = await self._connect(dst_eid)
else:
logging.debug('ConnectionManager: Using existing connection')
await conn.send(bundle)
import asyncio
import logging.config
import time
import pkg_resources
import yaml
from .bundle_protocol import bp_agent
from .bundle_protocol.bp_node import set_bp_node_id
from .bundle_protocol import convergence_layers
def load_config(config):
......@@ -31,20 +32,7 @@ async def _start(config=None):
set_bp_node_id(node_id)
if 'routes' in node_config:
for key, value in node_config['routes'].items():
bp_agent.add_route(dst_eid=key, via_eid=value)
for cla_name, cla_config in node_config['convergence-layer-adapters'].items():
cla_class = getattr(convergence_layers, cla_name)
cla = cla_class(cla_config)
bp_agent.register_cl_adapter(cla)
if 'neighbors' in cla_config:
for neighbor_eid, neighbor_config in cla_config['neighbors'].items():
# Save the neighbor EID inside the neighbor config
neighbor_config['eid'] = neighbor_eid
neighbor_class = cla_class.endpoint_class
bp_agent.register_remote_endpoint(neighbor_class(neighbor_config))
bp_agent.init(node_config)
# Start Bundle protocol agent
await bp_agent.run_forever()
......
......@@ -6,6 +6,7 @@ node:
TCPCL:
host: 0.0.0.0
port: 4556
date-time-format: '%Y-%m-%d %H:%M:%S'
# === LOGGING ===
......
import asyncio
import logging
import time
class AIOScheduler:
def __init__(self):
self.aio_tasks = []
def _call_at(self, date_time, fun_or_coro):
loop = asyncio.get_event_loop()
system_time = time.time()
sleep_time = date_time - system_time
if callable(fun_or_coro):
return loop.call_later(sleep_time, fun_or_coro)
async def waiting_and_job():
logging.info(f'Job scheduled to run in {round(sleep_time)} sec')
await asyncio.sleep(sleep_time)
await fun_or_coro
aio_task = asyncio.create_task(waiting_and_job())
self.aio_tasks.append(aio_task)
def schedule_job(self, date_time, coro):
self._call_at(date_time, coro)
def start(self):
return
......@@ -7,6 +7,7 @@ from aiohttp import web
import pydtn
from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp import NoContactError
def load_config(config):
......@@ -38,9 +39,13 @@ async def handle_config(request):
return web.json_response({'success': True})
except NoContactError as e:
return web.json_response({'success': False,
'details': f'There is no scheduled contact for node "{e.eid}"'})
except Exception:
logging.exception('Exception occurred while processing HTTP request')
return web.json_response({'success': False})
return web.json_response({'success': False,
'details': 'Internal error'})
@routes.post('/bundle')
......@@ -57,9 +62,13 @@ async def handle_bundle(request):
return web.json_response({'success': True})
except NoContactError as e:
return web.json_response({'success': False,
'details': f'There is no scheduled contact for node "{e.eid}"'})
except Exception:
logging.exception('Exception occurred while processing HTTP request')
return web.json_response({'success': False})
return web.json_response({'success': False,
'details': 'Internal error'})
async def run_app(app, host, port):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment