Commit 4ef772f3 authored by Juraj Sloboda's avatar Juraj Sloboda Committed by Matej Feder

Code review and improve logging

parent 838cf0e2
......@@ -9,7 +9,9 @@ import pydtn_rest
import app
logging.basicConfig(format='%(asctime)s - [%(levelname)-5s] - %(message)s', level=logging.DEBUG)
logging.basicConfig(format='%(asctime)s - [%(name)s] - [%(levelname)-5s] - %(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
......@@ -25,12 +27,12 @@ def load_config():
if args.yaml != '':
yaml_config = yaml.safe_load(args.yaml)
print(f'command line --yaml has value:\n{args.yaml}')
logger.debug(f'command line --yaml has value:\n{args.yaml}')
return yaml_config
if args.json != '':
json_config = json.loads(args.json)
print(f'command line --json has value:\n{args.json}')
logger.debug(f'command line --json has value:\n{args.json}')
return json_config
return {}
......
......@@ -4,8 +4,11 @@ from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp import EndOfData
logger = logging.getLogger(__name__)
async def recv_callback(ctx, app_eid, src_eid):
logging.info(f'BP client "{app_eid}" receiving ADU from "{src_eid}"')
logger.info(f'BP client "{app_eid}" receiving ADU from "{src_eid}"')
while True:
try:
data = await bp_agent.receive(ctx)
......
......@@ -17,6 +17,9 @@ from .conn_manager import ConnectionManager, NoEndpointsForEIDError
from . import misc
logger = logging.getLogger(__name__)
class EndOfData(Exception):
pass
......@@ -79,7 +82,7 @@ class BPAgent:
def register(self, eid, recv_callback=None):
if eid in (reg._app_singleton_eid for reg in self._eid_registrations):
raise AlreadyRegisteredError
logging.info(f'BP Agent is registering client EID "{eid}"')
logger.info(f'BP Agent is registering client EID "{eid}"')
pa = BPPrivateAPI(self, eid, recv_callback)
self._eid_registrations.append(pa)
return pa
......@@ -95,7 +98,7 @@ class BPAgent:
raise ValueError('start_time should be less than end_time')
start_time_str = datetime.datetime.fromtimestamp(start_time).strftime('%c')
end_time_str = datetime.datetime.fromtimestamp(end_time).strftime('%c')
logging.info(f'BP Agent adding contact for node "{eid}" - start: {start_time_str}, end: {end_time_str}')
logger.info(f'BP Agent adding contact for node "{eid}" - start: {start_time_str}, end: {end_time_str}')
contact = (start_time, end_time)
if eid not in self._contacts:
self._contacts[eid] = [contact]
......@@ -125,27 +128,27 @@ class BPAgent:
if (dst_eid, via_eid) in self._routes:
raise AlreadyRegisteredError
if dst_eid is not None:
logging.info(f'BP Agent adding route to "{dst_eid}" via "{via_eid}"')
logger.info(f'BP Agent adding route to "{dst_eid}" via "{via_eid}"')
else:
logging.info(f'BP Agent adding default route via "{via_eid}"')
logger.info(f'BP Agent adding default route via "{via_eid}"')
self._routes.append((dst_eid, via_eid))
def _get_via_eid(self, target_eid):
logging.debug(f'BP Agent is looking for a route to "{target_eid}"')
logger.debug(f'BP Agent is looking for a route to "{target_eid}"')
for target, via in self._routes:
if target is None or target == target_eid:
logging.debug(f'BP Agent found route to "{target_eid}" via "{via}"')
logger.debug(f'BP Agent found route to "{target_eid}" via "{via}"')
return via
logging.debug(f'BP Agent didn\'t find route to "{target_eid}"')
logger.debug(f'BP Agent didn\'t find route to "{target_eid}"')
return None
async def _send_bundle(self, dst_eid, bundle):
try:
await self._conn_manager.try_to_send(dst_eid, bundle)
except NoEndpointsForEIDError:
logging.info(f'BP Agent doesn\'t know how to connect to "{dst_eid}" node')
logger.info(f'BP Agent doesn\'t know how to connect to "{dst_eid}" node')
except ConnectFailedError:
logging.info(f'BP Agent failed to connect to "{dst_eid}" node')
logger.info(f'BP Agent failed to connect to "{dst_eid}" node')
async def _create_and_send_bundle_msg(self, source, destination, payload):
data = misc.serialize_bundle(source, destination, payload)
......@@ -157,18 +160,18 @@ class BPAgent:
else:
start_time = self._get_earliest_contact_start_time(via_eid)
if start_time is not None:
logging.info(f'BP Agent is scheduling job to send bundle to "{via_eid}"')
logger.info(f'BP Agent is scheduling job to send bundle to "{via_eid}"')
self._scheduler.schedule_job(start_time, self._send_bundle(via_eid, data))
else:
logging.warning(f'There is no scheduled contact for node "{via_eid}", removing bundle') # TODO: Try to use different route
logger.warning(f'There is no scheduled contact for node "{via_eid}", removing bundle') # TODO: Try to use different route
raise NoContactError(via_eid)
async def create_and_send_bundle_msg(self, source, destination, payload):
logging.info(f'BP Agent received request to send bundle (src="{source}", dst="{destination}")')
logger.info(f'BP Agent received request to send bundle (src="{source}", dst="{destination}")')
await self._create_and_send_bundle_msg(source, destination, payload)
async def create_and_send_config_msg(self, target_eid, eid, cla_address, start_offset, end_offset):
logging.info('BP Agent received request to send uPCN config message')
logger.info('BP Agent received request to send uPCN config message')
inner_msg = misc.serialize_upcn_config_message(
eid,
cla_address,
......@@ -231,11 +234,11 @@ class BPAgent:
data = data[stream_len:]
async def _recv_callback(self, conn):
logging.info(f'BP Agent started to receive bundle')
logger.info(f'BP Agent started to receive bundle')
added_data = b''
while True:
data, first_bundle_segment, last_bundle_segment = await conn.recv()
logging.debug(f'BP Agent received data of length {len(data)}')
logger.debug(f'BP Agent received data of length {len(data)}')
added_data += data
try:
bp_client, app_eid, src_eid = self._determine_bp_client_and_src_eid(added_data)
......@@ -247,7 +250,7 @@ class BPAgent:
translated_data, total_len = BPAgent._translate_to_upper_layer(added_data)
except BPAgent._CannotTranslateYet:
if last_bundle_segment:
logging.debug(
logger.debug(
f'BP Agent failed to recognize received data as bundle:\n{hexdump(added_data)}')
return
else:
......@@ -257,7 +260,7 @@ class BPAgent:
return
data, first_bundle_segment, last_bundle_segment = await conn.recv()
if last_bundle_segment:
logging.info(f'BP Agent finished receiving bundle')
logger.info(f'BP Agent finished receiving bundle')
added_data += data
class PayloadReader:
......@@ -282,7 +285,7 @@ class BPAgent:
while not last_bundle_segment:
data, first_bundle_segment, last_bundle_segment = await self._reader.recv()
self._reading_finished = True
logging.info(f'BP Agent finished receiving bundle')
logger.info(f'BP Agent finished receiving bundle')
return return_data
......
......@@ -3,6 +3,10 @@
import logging
logger = logging.getLogger(__name__)
_bp_node_id = None
......@@ -12,5 +16,5 @@ def get_bp_node_id():
def set_bp_node_id(eid):
global _bp_node_id
logging.info(f'Setting BP Node EID to "{eid}"')
logger.info(f'Setting BP Node EID to "{eid}"')
_bp_node_id = eid
......@@ -5,6 +5,9 @@ from .convergence_layers.api import ConvergenceLayerAdapter, ConnectFailedError
from ..util.exceptions import AlreadyRegisteredError
logger = logging.getLogger(__name__)
class NoEndpointsForEIDError(Exception):
pass
......@@ -31,14 +34,14 @@ class ConnectionManager:
def _add_connection(self, conn):
if conn in self._connections:
return
logging.debug('ConnectionManager: Adding connection to the list of existing connections')
logger.debug('ConnectionManager: Adding connection to the list of existing connections')
self._connections.append(conn)
def _remove_connection(self, conn):
if conn not in self._connections:
logging.warning('ConnectionManager: Trying to remove connection that isn\'t registered')
logger.warning('ConnectionManager: Trying to remove connection that isn\'t registered')
return
logging.debug('ConnectionManager: Removing connection from the list of existing connections')
logger.debug('ConnectionManager: Removing connection from the list of existing connections')
self._connections.remove(conn)
async def request_handler(self, conn):
......@@ -53,7 +56,7 @@ class ConnectionManager:
for cl_adapter in self._cl_adapter_instances])
async def _connect(self, remote_eid):
logging.info(f'ConnectionManager: Connecting to "{remote_eid}" node')
logger.info(f'ConnectionManager: Connecting to "{remote_eid}" node')
some_conn_failed = False
for adapter in self._cl_adapter_instances:
if any(adapter.get_remote_endpoints_for_eid(remote_eid)):
......@@ -64,7 +67,7 @@ class ConnectionManager:
conn = await adapter.connect(remote_endpoint)
except ConnectFailedError:
some_conn_failed = True
logging.info(f'ConnectionManager: Failed to connect to "{remote_eid}" node')
logger.info(f'ConnectionManager: Failed to connect to "{remote_eid}" node')
continue
self._add_connection(conn)
return conn
......@@ -79,11 +82,11 @@ class ConnectionManager:
return self._connections[0]
async def try_to_send(self, dst_eid, bundle):
logging.debug(f'ConnectionManager: Trying to send bundle to "{dst_eid}"')
logger.debug(f'ConnectionManager: Trying to send bundle to "{dst_eid}"')
conn = self._pick_a_connection()
if conn is None:
logging.debug('ConnectionManager: No connection exists, creating one')
logger.debug('ConnectionManager: No connection exists, creating one')
conn = await self._connect(dst_eid)
else:
logging.debug('ConnectionManager: Using existing connection')
logger.debug('ConnectionManager: Using existing connection')
await conn.send(bundle)
import logging
from abc import ABC, abstractmethod
from ..bp_node import get_bp_node_id
......
# import logging
import logging
from .api import ConvergenceLayerConnection
# from ...util.hexdump import hexdump
from ...util.io_util import WritebackReader, read_exact
logger = logging.getLogger(__name__)
class StreamedConvergenceLayerConnection(ConvergenceLayerConnection):
def __init__(self, reader, writer):
super().__init__()
......@@ -13,7 +16,7 @@ class StreamedConvergenceLayerConnection(ConvergenceLayerConnection):
self._writer = writer
async def _send(self, data):
# logging.debug('StreamedConvergenceLayerConnection: Writing to stream:\n' + hexdump(data))
# logger.debug('StreamedConvergenceLayerConnection: Writing to stream:\n' + hexdump(data))
self._writer.write(data)
await self._writer.drain()
......
......@@ -9,6 +9,9 @@ from ...encoding.tcpcl import ObjDef as TCPCLObjDef, MessageType, ContactMessage
from ...util import uio
logger = logging.getLogger(__name__)
class RemoteTCPEndpoint(RemoteEndpoint):
def __init__(self, endpoint):
super().__init__(endpoint)
......@@ -27,7 +30,7 @@ class TCPCL(ConvergenceLayerAdapter):
async def connect(self, endpoint):
assert isinstance(endpoint, RemoteTCPEndpoint)
logging.info(f'TCPCL: Connecting to endpoint: {endpoint.host}:{endpoint.port}')
logger.info(f'TCPCL: Connecting to endpoint: {endpoint.host}:{endpoint.port}')
try:
reader, writer = await uio.open_connection(endpoint.host, endpoint.port)
except ConnectionRefusedError or TimeoutError:
......@@ -43,21 +46,21 @@ class TCPCL(ConvergenceLayerAdapter):
async def _request_handler(self, reader, writer):
# TODO: Don't close the connection, but repeatedly call self._upper_layer_handler for every new bundle prefix
logging.info(f'TCPCL: Accepted connection')
logger.info(f'TCPCL: Accepted connection')
conn = TCPCLConnection(reader, writer)
try:
await conn.start()
await self._upper_layer_handler(conn)
except Exception:
logging.exception('Exception occurred inside _request_handler')
logger.exception('Exception occurred inside _request_handler')
finally:
conn.close() # TODO: Don't automatically close connection after one bundle has been received
async def run_forever(self, upper_layer_handler):
self._upper_layer_handler = upper_layer_handler
logging.info(f'TCPCL: listening on {self._host}:{self._port}')
logger.info(f'TCPCL: listening on {self._host}:{self._port}')
await uio.serve_forever(self._request_handler, self._host, self._port)
logging.info('TCPCL: exited')
logger.info('TCPCL: exited')
class TCPCLConnection(StreamedConvergenceLayerConnection):
......@@ -69,14 +72,14 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
try:
value, bytes_decoded = sdnv_decode(buffer=data)
except ValueError:
logging.info('Error reading SDNV: Unexpected end of input!')
logger.info('Error reading SDNV: Unexpected end of input!')
raise self.ProtocolError
self._unrecv(data[bytes_decoded:])
return value
async def _send_contact_msg(self):
contact_msg = ContactMessage(self.our_eid).serialize()
# logging.debug(f'TCPCL: Sending contact header:\n {contact_msg}')
# logger.debug(f'TCPCL: Sending contact header:\n {contact_msg}')
await self._send(contact_msg)
class ContactMessageReadError(Exception):
......@@ -91,7 +94,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
async def _recv_contact_msg(self):
data = await self._recv_exact(len(TCPCLObjDef.MAGIC))
if data != TCPCLObjDef.MAGIC:
logging.info('Got wrong MAGIC in contact message!')
logger.info('Got wrong MAGIC in contact message!')
raise self.ContactMessageReadError
# Skip directly to Node ID
......@@ -102,7 +105,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
try:
len_of_payload, bytes_decoded = sdnv_decode(buffer=data)
except ValueError:
logging.info('Unexpected end of contact message!')
logger.info('Unexpected end of contact message!')
raise self.ContactMessageReadError
self._unrecv(data[bytes_decoded:])
......@@ -111,7 +114,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
peer_eid = data.decode("ascii")
except UnicodeDecodeError:
peer_eid = data
logging.debug(f'Received peer EID: {peer_eid}')
logger.debug(f'Received peer EID: {peer_eid}')
# TODO: If connection created by us, compare to EID of registered target endpoint
async def _send_segment(self, msg_type, msg_flags, content_bytes):
......@@ -135,7 +138,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
msg_type = MessageType(header[0] >> 4)
msg_flags = header[0] & 0xf
logging.debug(f'TCPCL: Received segment of type {msg_type.name}')
logger.debug(f'TCPCL: Received segment of type {msg_type.name}')
if msg_type == MessageType.DATA_SEGMENT:
payload_len = await self._recv_sdnv()
......@@ -169,7 +172,7 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
async def start(self):
await self._send_contact_msg()
await self._recv_contact_msg()
logging.info('TCPCL: Contact messages exchanged, connection ready to be used')
logger.info('TCPCL: Contact messages exchanged, connection ready to be used')
async def send(self, data, bundle_start=True, bundle_end=True):
"""Send data as one bundle data segment
......@@ -200,5 +203,5 @@ class TCPCLConnection(StreamedConvergenceLayerConnection):
return content, first_bundle_segment, last_bundle_segment
def close(self):
logging.info(f'TCPCL: Closing connection')
logger.info(f'TCPCL: Closing connection')
super().close()
......@@ -45,6 +45,10 @@ from cbor.cbor import CBOR_ARRAY
from .crc import crc16_x25, crc32
logger = logging.getLogger(__name__)
__all__ = [
'BundleProcFlag',
'CRCType',
......@@ -234,7 +238,7 @@ def print_hex(binary):
"""Prints hexadecimal representation of a binary input in one line"""
in_hex = ["0x{:02x}, ".format(b) for b in binary]
in_hex.append("\n")
logging.debug("".join(in_hex))
logger.debug("".join(in_hex))
class PrimaryBlock(object):
......
# import logging
import logging
import struct
from enum import IntEnum
......@@ -6,6 +6,9 @@ from enum import IntEnum
from .sdnv import sdnv_encode
logger = logging.getLogger(__name__)
class ObjDef(object):
"""
Definitions specific for TCPCL (https://tools.ietf.org/html/rfc7242)
......@@ -41,6 +44,6 @@ class ContactMessage(object):
contact_msg_as_bytes += struct.pack("!H", 0) # keepalive
contact_msg_as_bytes += sdnv_encode(self.eidLen)
contact_msg_as_bytes += self.eid.encode("ascii")
# logging.debug(hexdump(contact_msg_as_bytes))
# logger.debug(hexdump(contact_msg_as_bytes))
return contact_msg_as_bytes
import asyncio
import logging.config
import time
import pkg_resources
import yaml
......@@ -9,6 +8,9 @@ from .bundle_protocol import bp_agent
from .bundle_protocol.bp_node import set_bp_node_id
logger = logging.getLogger(__name__)
def load_config(config):
filename = pkg_resources.resource_filename('pydtn', 'pydtn.yaml')
with open(filename, 'rt') as file:
......@@ -17,18 +19,22 @@ def load_config(config):
return {**default_config, **config}
async def _start(config=None):
def setup_logging(log_config):
logging.config.dictConfig(log_config)
async def start(config=None):
config = load_config(config)
logging.config.dictConfig(config['log'])
setup_logging(config['log'])
logging.debug(f'pydtn config is:\n{yaml.safe_dump(config)}')
logger.debug(f'pydtn config is:\n{yaml.safe_dump(config)}')
node_config = config['node']
node_id = node_config['id']
if node_id.endswith('default.dtn'):
logging.warning('Using default pyDTN config')
logger.warning('Using default pyDTN config')
set_bp_node_id(node_id)
......@@ -38,19 +44,5 @@ async def _start(config=None):
await bp_agent.run_forever()
_running = False
async def start(config=None):
global _running
if _running:
return
_running = True
try:
await _start(config)
finally:
_running = False
if __name__ == '__main__':
asyncio.run(start())
......@@ -12,9 +12,10 @@ node:
log:
version: 1
disable_existing_loggers: false
formatters:
standard:
format: "%(asctime)s - [%(levelname)-5s] - %(message)s"
format: "%(asctime)s - [%(name)s] - [%(levelname)-5s] - %(message)s"
handlers:
console:
class: logging.StreamHandler
......@@ -26,11 +27,8 @@ log:
level: DEBUG
formatter: standard
filename: pydtn.log
root:
level: DEBUG
handlers: [console, file]
loggers:
pydtn:
level: DEBUG
handlers: [console, file]
propogate: no
propagate: no
......@@ -3,6 +3,9 @@ import logging
import time
logger = logging.getLogger(__name__)
class AIOScheduler:
def __init__(self):
self.aio_tasks = []
......@@ -15,7 +18,7 @@ class AIOScheduler:
return loop.call_later(sleep_time, fun_or_coro)
async def waiting_and_job():
logging.info(f'Job scheduled to run in {round(sleep_time)} sec')
logger.info(f'Job scheduled to run in {round(sleep_time)} sec')
await asyncio.sleep(sleep_time)
await fun_or_coro
......
......@@ -10,6 +10,9 @@ from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp import NoContactError
logger = logging.getLogger(__name__)
def load_config(config):
filename = pkg_resources.resource_filename('pydtn_rest', 'pydtn_rest.yaml')
with open(filename, 'rt') as file:
......@@ -23,52 +26,42 @@ routes = web.RouteTableDef()
@routes.post('/config')
async def handle_config(request):
logging.info('HTTPD: Got request on /config')
logger.info('HTTP: Got request on /config')
try:
json = await request.json()
json = await request.json()
if 'messages' not in json:
raise web.HTTPBadRequest
if 'messages' not in json:
raise web.HTTPBadRequest
try:
for msg in json['messages']:
await bp_agent.create_and_send_config_msg('dtn://ops-sat.dtn/config',
msg['eid'], msg['cla_address'],
int(msg['start_offset']),
int(msg['end_offset']))
return web.json_response({'success': True})
except NoContactError as e:
return web.json_response({'success': False,
'details': f'There is no scheduled contact for node "{e.eid}"'})
except Exception:
logging.exception('Exception occurred while processing HTTP request')
return web.json_response({'success': False,
'details': 'Internal error'})
return web.json_response({'success': True})
@routes.post('/bundle')
async def handle_bundle(request):
logging.info('HTTPD: Got request on /bundle')
logger.info('HTTP: Got request on /bundle')
try:
data = await request.text()
data = await request.text()
source = request.rel_url.query['source']
destination = request.rel_url.query['destination']
source = request.rel_url.query['source']
destination = request.rel_url.query['destination']
try:
await bp_agent.create_and_send_bundle_msg(source, destination, data)
return web.json_response({'success': True})
except NoContactError as e:
return web.json_response({'success': False,
'details': f'There is no scheduled contact for node "{e.eid}"'})
except Exception:
logging.exception('Exception occurred while processing HTTP request')
return web.json_response({'success': False,
'details': 'Internal error'})
return web.json_response({'success': True})
async def run_app(app, host, port):
......@@ -83,22 +76,25 @@ async def run_app(app, host, port):
await runner.cleanup()
def setup_logging(log_config):
logging.config.dictConfig(log_config)
async def _start(config=None):
config = load_config(config)
logging.config.dictConfig(config['log'])
setup_logging(config['log'])
logging.debug(f'pydtn_rest config is:\n{yaml.safe_dump(config)}')
logger.debug(f'pydtn_rest config is:\n{yaml.safe_dump(config)}')
app = web.Application()
app.add_routes(routes)
api_config = config['api']
host = api_config['host']
port = api_config['port']
logging.info(f'REST endpoint starting on {host}:{port}')
host = config['api']['host']
port = config['api']['port']
logger.info(f'REST endpoint starting on {host}:{port}')
await run_app(app, host, port)
logging.info('REST endpoint exited')
logger.info('REST endpoint exited')
async def start(config=None):
......
......@@ -8,9 +8,10 @@ api:
log:
version: 1
disable_existing_loggers: false
formatters:
standard:
format: "%(asctime)s - [%(levelname)-5s] - %(message)s"
format: "%(asctime)s - [%(name)s] - [%(levelname)-5s] - %(message)s"
handlers:
console:
class: logging.StreamHandler
......@@ -22,11 +23,8 @@ log:
level: DEBUG
formatter: standard
filename: pydtn_rest.log
root:
level: DEBUG
handlers: [console, file]
loggers:
pydtn_rest:
level: DEBUG
handlers: [console, file]
propogate: no
propagate: no
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment