Commit 67dd49a6 authored by Juraj Sloboda's avatar Juraj Sloboda Committed by Matej Feder

Fix and simplify bundle decoding

parent b74488be
import logging import logging
from pydtn.bundle_protocol import bp_agent from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp import EndOfData
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def recv_callback(ctx, app_eid, src_eid): async def recv_callback(adu, adu_len, app_eid, src_eid):
logger.info(f'BP client "{app_eid}" receiving ADU from "{src_eid}"') logger.info(f'BP client "{app_eid}" received ADU from of length {adu_len} from "{src_eid}"')
while True: print(adu.decode('ascii'))
try:
data = await bp_agent.receive(ctx)
except EndOfData:
break
print(data.decode('ascii'), end='', flush=True)
print()
def start(config): def start(config):
......
import datetime import datetime
import logging import logging
import time import time
from io import BytesIO
from cbor2 import CBORDecoder from cbor2 import CBORDecoder, CBORDecodeError
from ..encoding.bundle7 import EID from ..encoding.bundle7 import EID
from ..util.aio_scheduler import AIOScheduler from ..util.aio_scheduler import AIOScheduler
from ..util.exceptions import AlreadyRegisteredError from ..util.exceptions import AlreadyRegisteredError
from ..util.io_util import CountingBytesIO
from ..util.hexdump import hexdump from ..util.hexdump import hexdump
from . import convergence_layers from . import convergence_layers
...@@ -20,15 +20,6 @@ from . import misc ...@@ -20,15 +20,6 @@ from . import misc
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class EndOfData(Exception):
pass
class ClientNotRegistered(Exception):
def __init__(self, eid):
self.eid = eid
class NoContactError(Exception): class NoContactError(Exception):
def __init__(self, eid): def __init__(self, eid):
self.eid = eid self.eid = eid
...@@ -151,6 +142,8 @@ class BPAgent: ...@@ -151,6 +142,8 @@ class BPAgent:
logger.info(f'BP Agent failed to connect to "{dst_eid}" node') logger.info(f'BP Agent failed to connect to "{dst_eid}" node')
async def _create_and_send_bundle_msg(self, source, destination, payload): async def _create_and_send_bundle_msg(self, source, destination, payload):
if isinstance(payload, str):
payload = payload.encode('ascii')
data = misc.serialize_bundle(source, destination, payload) data = misc.serialize_bundle(source, destination, payload)
via_eid = self._get_via_eid(destination) via_eid = self._get_via_eid(destination)
if via_eid is None: if via_eid is None:
...@@ -181,112 +174,61 @@ class BPAgent: ...@@ -181,112 +174,61 @@ class BPAgent:
) )
await self._create_and_send_bundle_msg(get_bp_node_id(), target_eid, inner_msg) await self._create_and_send_bundle_msg(get_bp_node_id(), target_eid, inner_msg)
@staticmethod async def _recv_callback(self, conn):
async def receive(context): logger.info('BP Agent is receiving bundle')
assert(isinstance(context, BPAgent.PayloadReader)) last_bundle_segment = False
return await context.read() bundle_data = b''
while not last_bundle_segment:
class _CannotExtractEIDsFromDataYet(Exception): data_segment, _, last_bundle_segment = await conn.recv()
pass bundle_data += data_segment
fp = BytesIO(bundle_data)
class _CannotDetermineBpClientAndSrcEIDYet(Exception):
pass
class _CannotTranslateYet(Exception):
pass
@staticmethod
def _decode_bundle_block(data):
fp = CountingBytesIO(data)
decoder = CBORDecoder(fp) decoder = CBORDecoder(fp)
obj = decoder.decode()
len = fp.position
return obj, len
@staticmethod
def _extract_dst_and_src_eids_from_data(data):
try: try:
block, len = BPAgent._decode_bundle_block(data[1:]) bundle = decoder.decode()
except Exception: except CBORDecodeError as e:
raise BPAgent._CannotExtractEIDsFromDataYet logger.warning(f'Error in CBOR format of received bundle: {str(e)}')
return str(EID(block[3])), str(EID(block[4])) return
def _determine_bp_client_and_src_eid(self, data):
try: try:
client_eid, src_eid = self._extract_dst_and_src_eids_from_data(data) assert isinstance(bundle, list)
except BPAgent._CannotExtractEIDsFromDataYet: assert len(bundle) >= 2
raise BPAgent._CannotDetermineBpClientAndSrcEIDYet
client = self._get_client_by_eid(client_eid) primary_block = bundle[0]
if client is None: assert isinstance(primary_block, list)
raise ClientNotRegistered(client_eid) assert len(primary_block) > 4
return self._get_client_by_eid(client_eid), client_eid, src_eid
src_eid_block = primary_block[4]
@staticmethod assert isinstance(src_eid_block, list)
def _translate_to_upper_layer(data): assert len(src_eid_block) == 2
data = data[1:] src_eid = str(EID(src_eid_block))
while True:
try: dst_eid_block = primary_block[3]
block, stream_len = BPAgent._decode_bundle_block(data) assert isinstance(dst_eid_block, list)
except Exception: assert len(dst_eid_block) == 2
raise BPAgent._CannotTranslateYet dst_eid = str(EID(dst_eid_block))
if block[0] == 1:
return block[5].encode('ascii'), block[4] bp_client = self._get_client_by_eid(dst_eid)
data = data[stream_len:] if bp_client is None:
logger.warning(f'BP Agent received bundle with unknown EID ({dst_eid}) as destination')
async def _recv_callback(self, conn): return
logger.info(f'BP Agent started to receive bundle')
added_data = b'' payload_block = bundle[-1]
while True: assert isinstance(payload_block, list)
data, first_bundle_segment, last_bundle_segment = await conn.recv() assert len(payload_block) > 5
logger.debug(f'BP Agent received data of length {len(data)}') assert isinstance(payload_block[0], int)
added_data += data assert payload_block[0] == 1
try:
bp_client, app_eid, src_eid = self._determine_bp_client_and_src_eid(added_data) payload = payload_block[5]
except BPAgent._CannotDetermineBpClientAndSrcEIDYet: assert isinstance(payload, bytes)
pass
else: except AssertionError or ValueError:
while True: logger.warning(f'Error in bundle format of received bundle')
try: logger.debug(f'Received data:\n{hexdump(bundle_data)}')
translated_data, total_len = BPAgent._translate_to_upper_layer(added_data) return
except BPAgent._CannotTranslateYet:
if last_bundle_segment: logger.debug('Calling BP client receive callback')
logger.debug( await bp_client._recv_callback(payload, len(payload), dst_eid, src_eid)
f'BP Agent failed to recognize received data as bundle:\n{hexdump(added_data)}') logger.debug('BP client receive callback finished')
return
else:
await bp_client._recv_callback(
self.PayloadReader(translated_data, last_bundle_segment, conn, total_len),
app_eid, src_eid)
return
data, first_bundle_segment, last_bundle_segment = await conn.recv()
if last_bundle_segment:
logger.info(f'BP Agent finished receiving bundle')
added_data += data
class PayloadReader:
def __init__(self, initial_data, data_complete, reader, n_to_read):
self._initial_data = initial_data
self._reading_finished = data_complete
self._reader = reader
self._n_to_read = n_to_read
async def read(self):
if self._initial_data is not None:
return_data = self._initial_data
self._initial_data = None
return return_data
if self._reading_finished:
raise EndOfData
data, first_bundle_segment, last_bundle_segment = await self._reader.recv()
if len(data) < self._n_to_read:
self._n_to_read -= len(data)
return data
return_data = data[:self._n_to_read]
while not last_bundle_segment:
data, first_bundle_segment, last_bundle_segment = await self._reader.recv()
self._reading_finished = True
logger.info(f'BP Agent finished receiving bundle')
return return_data
class BPPrivateAPI: class BPPrivateAPI:
......
from io import BytesIO
class CountingBytesIO(BytesIO):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.position = 0
def read(self, size):
data = super().read(size)
self.position += len(data)
return data
class WritebackReader: class WritebackReader:
def __init__(self, reader): def __init__(self, reader):
self._reader = reader self._reader = reader
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment