Commit b7701c2b authored by Juraj Sloboda's avatar Juraj Sloboda Committed by Matej Feder

Restructure project folders

parent 93794d05
import argparse import argparse
import logging import logging
from bundle_protocol import bp_agent from pydtn.bundle_protocol.convergence_layers.tcp_cl import RemoteTCPEndpoint
from bundle_protocol.bp_node import set_bp_node_id from pydtn.bundle_protocol import bp_agent
from bundle_protocol.convergence_layers.tcp_cl import TCPCLAdapter, RemoteTCPEndpoint from pydtn.bundle_protocol.bp import EndOfData
from httpd import httpd from pydtn import start as bp_start
from app import app from pydtn_rest import start as rest_start
from util.concurrency import async_run
logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s', level=logging.DEBUG) logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s', level=logging.DEBUG)
async def main(): our_eid = None
bp_client = None
async def recv_callback(ctx, src_eid):
global our_eid
logging.info(f'BP client "{our_eid}" receiving ADU from "{src_eid}"')
while True:
try:
data = await bp_agent.receive(ctx)
except EndOfData:
break
print(data.decode('ascii'), end='', flush=True)
print()
if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--node", type=str) parser.add_argument("--node", type=str)
parser.add_argument("--port", type=int, default=42420) parser.add_argument("--port", type=int, default=42420)
...@@ -20,29 +35,21 @@ async def main(): ...@@ -20,29 +35,21 @@ async def main():
parser.add_argument("--upcn-eid", type=str, default='dtn://ops-sat.dtn') parser.add_argument("--upcn-eid", type=str, default='dtn://ops-sat.dtn')
parser.add_argument("--upcn-host", type=str, default="127.0.0.1") parser.add_argument("--upcn-host", type=str, default="127.0.0.1")
parser.add_argument("--upcn-port", type=int, default=4556) parser.add_argument("--upcn-port", type=int, default=4556)
args = parser.parse_args()
set_bp_node_id(args.node) args = parser.parse_args()
# Initialize TCP convergence layer adapter # Start Bundle protocol server
tcp_cla = TCPCLAdapter(port=args.port) bp_start(args.node, args.port)
bp_agent.register_cl_adapter(tcp_cla)
# Register uPCN node as neighbor accessible through TCP # Register uPCN node as neighbor accessible through TCP
tcp_cla.register_remote_endpoint(RemoteTCPEndpoint(args.upcn_eid, args.upcn_host, args.upcn_port)) bp_agent.register_remote_endpoint(RemoteTCPEndpoint(args.upcn_eid, args.upcn_host, args.upcn_port))
# Send all traffic through uPCN node # Send all traffic through uPCN node
bp_agent.add_route(None, args.upcn_eid) bp_agent.add_route(None, args.upcn_eid)
# Start example application that prints received bundle payloads
app.start(args)
# Start REST endpoint # Start REST endpoint
await httpd.start(args) rest_start(args.http_port)
# Start Bundle protocol server
await bp_agent.run_forever(args)
# Start example application that prints received bundle payloads
if __name__ == '__main__': our_eid = args.node
async_run(main()) bp_app = bp_agent.register(our_eid, recv_callback)
...@@ -13,7 +13,11 @@ services: ...@@ -13,7 +13,11 @@ services:
tty: true tty: true
restart: always restart: always
volumes: volumes:
- ../src:/pyDTN - ../pydtn:/pydtn
- ../pydtn_rest:/pydtn_rest
- ../demo_app:/demo_app
environment:
- PYTHONPATH=/pydtn:/pydtn_rest:/demo_app
command: --node dtn://pyDTN-1.dtn --port 2001 --http-port 8081 --upcn-host 172.25.1.21 --upcn-port 4556 command: --node dtn://pyDTN-1.dtn --port 2001 --http-port 8081 --upcn-host 172.25.1.21 --upcn-port 4556
networks: networks:
dtnnet1: dtnnet1:
...@@ -29,7 +33,11 @@ services: ...@@ -29,7 +33,11 @@ services:
tty: true tty: true
restart: always restart: always
volumes: volumes:
- ../src:/pyDTN - ../pydtn:/pydtn
- ../pydtn_rest:/pydtn_rest
- ../demo_app:/demo_app
environment:
- PYTHONPATH=/pydtn:/pydtn_rest:/demo_app
command: --node dtn://pyDTN-2.dtn --port 2002 --http-port 8082 --upcn-host 172.25.2.21 --upcn-port 4556 command: --node dtn://pyDTN-2.dtn --port 2002 --http-port 8082 --upcn-host 172.25.2.21 --upcn-port 4556
networks: networks:
dtnnet2: dtnnet2:
......
...@@ -4,9 +4,6 @@ FROM python:3 ...@@ -4,9 +4,6 @@ FROM python:3
# Update aptitude with new repo # Update aptitude with new repo
RUN apt-get update RUN apt-get update
RUN apt-get install -y python3-pip joe tcpdump RUN apt-get install -y python3-pip joe tcpdump
COPY requirements.txt / RUN pip install cbor~=1.0.0 cbor2~=4.1.2 asyncio~=3.4.3 aiohttp~=3.5.4
RUN pip install -r /requirements.txt
WORKDIR /pyDTN ENTRYPOINT ["python", "-m", "demo_app.main"]
ENTRYPOINT ["python3", "-m", "main"]
from .pydtn import start
import logging import logging
from cbor2 import CBORDecoder from cbor2 import CBORDecoder
from bundle_protocol.convergence_layers.api import ConnectFailedError from ..encoding.bundle7 import EID
from encoding.bundle7 import EID from ..util.exceptions import AlreadyRegisteredError
from bundle_protocol.bp_node import get_bp_node_id from ..util.io_util import CountingBytesIO
from bundle_protocol.conn_manager import ConnectionManager, UnknownEID, NoContact from ..util.hexdump import hexdump
from bundle_protocol import misc
from util.exceptions import AlreadyRegisteredError from .convergence_layers.api import ConnectFailedError
from util.io_util import CountingBytesIO from .bp_node import get_bp_node_id
from util.hexdump import hexdump from .conn_manager import ConnectionManager, UnknownEID, NoContact
from . import misc
class EndOfData(Exception): class EndOfData(Exception):
...@@ -30,7 +31,10 @@ class BPAgent: ...@@ -30,7 +31,10 @@ class BPAgent:
def register_cl_adapter(self, adapter): def register_cl_adapter(self, adapter):
self._conn_manager.register_cl_adapter(adapter) self._conn_manager.register_cl_adapter(adapter)
async def run_forever(self, args): def register_remote_endpoint(self, remote_endpoint):
self._conn_manager.register_remote_endpoint(remote_endpoint)
async def run_forever(self):
await self._conn_manager.run_forever() await self._conn_manager.run_forever()
def register(self, eid, recv_callback): def register(self, eid, recv_callback):
......
import asyncio
from typing import Type from typing import Type
import logging import logging
from bundle_protocol.convergence_layers.api import ConvergenceLayerAdapter, ConnectFailedError from .convergence_layers.api import ConvergenceLayerAdapter, ConnectFailedError
from util.concurrency import run_concurrently from ..util.exceptions import AlreadyRegisteredError
from util.exceptions import AlreadyRegisteredError
class UnknownEID(Exception): class UnknownEID(Exception):
...@@ -45,6 +45,11 @@ class ConnectionManager: ...@@ -45,6 +45,11 @@ class ConnectionManager:
else: else:
raise ValueError raise ValueError
def register_remote_endpoint(self, remote_endpoint):
for adapter in self._cl_adapter_instances:
if isinstance(adapter, remote_endpoint.cla_class):
adapter.register_remote_endpoint(remote_endpoint)
def _add_connection(self, conn): def _add_connection(self, conn):
if conn in self._connections: if conn in self._connections:
return return
...@@ -64,8 +69,8 @@ class ConnectionManager: ...@@ -64,8 +69,8 @@ class ConnectionManager:
self._remove_connection(conn) # TODO: Don't automatically close connection after one bundle has been received self._remove_connection(conn) # TODO: Don't automatically close connection after one bundle has been received
async def run_forever(self): async def run_forever(self):
await run_concurrently(*(cl_adapter.run_forever(self.request_handler) await asyncio.wait([asyncio.create_task(cl_adapter.run_forever(self.request_handler))
for cl_adapter in self._cl_adapter_instances)) for cl_adapter in self._cl_adapter_instances])
async def _connect(self, remote_eid): async def _connect(self, remote_eid):
logging.info(f'ConnectionManager: Connecting to "{remote_eid}" node') logging.info(f'ConnectionManager: Connecting to "{remote_eid}" node')
......
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from bundle_protocol.bp_node import get_bp_node_id from ..bp_node import get_bp_node_id
from util.configure import Configurable from ...util.configure import Configurable
from util.exceptions import AlreadyRegisteredError from ...util.exceptions import AlreadyRegisteredError
# TODO: Choose different name to avoid confusion with 'Bundle endpoint' which is a different concept # TODO: Choose different name to avoid confusion with 'Bundle endpoint' which is a different concept
......
# import logging # import logging
from bundle_protocol.convergence_layers.api import ConvergenceLayerConnection from .api import ConvergenceLayerConnection
# from util.hexdump import hexdump # from ...util.hexdump import hexdump
from util.io_util import WritebackReader, read_exact from ...util.io_util import WritebackReader, read_exact
class StreamedConvergenceLayerConnection(ConvergenceLayerConnection): class StreamedConvergenceLayerConnection(ConvergenceLayerConnection):
......
...@@ -2,11 +2,11 @@ ...@@ -2,11 +2,11 @@
import logging import logging
from bundle_protocol.convergence_layers.api import ConvergenceLayerAdapter, RemoteEndpoint, ConnectFailedError from .api import ConvergenceLayerAdapter, RemoteEndpoint, ConnectFailedError
from bundle_protocol.convergence_layers.streamed_cl_base import StreamedConvergenceLayerConnection from .streamed_cl_base import StreamedConvergenceLayerConnection
from encoding.sdnv import sdnv_decode, sdnv_encode from ...encoding.sdnv import sdnv_decode, sdnv_encode
from encoding.tcpcl import ObjDef as TCPCLObjDef, MessageType, ContactMessage from ...encoding.tcpcl import ObjDef as TCPCLObjDef, MessageType, ContactMessage
from util import uio from ...util import uio
class TCPCLAdapter(ConvergenceLayerAdapter): class TCPCLAdapter(ConvergenceLayerAdapter):
......
import datetime import datetime
import time import time
from encoding.bundle7 import ( from ..encoding.bundle7 import (
Bundle, Bundle,
PrimaryBlock, PrimaryBlock,
CreationTimestamp, CreationTimestamp,
......
...@@ -42,7 +42,8 @@ from datetime import datetime, tzinfo, timedelta ...@@ -42,7 +42,8 @@ from datetime import datetime, tzinfo, timedelta
from random import random from random import random
import cbor import cbor
from cbor.cbor import CBOR_ARRAY from cbor.cbor import CBOR_ARRAY
from encoding.crc import crc16_x25, crc32
from .crc import crc16_x25, crc32
__all__ = [ __all__ = [
'BundleProcFlag', 'BundleProcFlag',
......
import struct import struct
from encoding.sdnv import sdnv_decode, sdnv_encode from .sdnv import sdnv_decode, sdnv_encode
from util.hexdump import hexdump from ..util.hexdump import hexdump
from util.misc import IntField, ParsingError from ..util.misc import IntField
from ..util.exceptions import ParsingError
class ObjDef: class ObjDef:
......
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
import struct import struct
from enum import IntEnum from enum import IntEnum
# from util.hexdump import hexdump # from ..util.hexdump import hexdump
from encoding.sdnv import sdnv_encode from .sdnv import sdnv_encode
class ObjDef(object): class ObjDef(object):
......
import asyncio
import logging
from .bundle_protocol import bp_agent
from .bundle_protocol.bp_node import set_bp_node_id
from .bundle_protocol.convergence_layers.tcp_cl import TCPCLAdapter
logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s', level=logging.DEBUG)
def start(node_id, tcp_port):
set_bp_node_id(node_id)
# Initialize TCP convergence layer adapter
tcp_cla = TCPCLAdapter(port=tcp_port)
bp_agent.register_cl_adapter(tcp_cla)
# Start Bundle protocol server
loop = asyncio.get_event_loop()
loop.run_until_complete(bp_agent.run_forever())
from datetime import timedelta from datetime import timedelta
from src.encoding.bundle7 import Bundle, BundleAgeBlock, BundleProcFlag, BundleStatusReport, CreationTimestamp, HopCountBlock,\ from ..encoding.bundle7 import Bundle, BundleAgeBlock, BundleProcFlag, BundleStatusReport, CreationTimestamp, HopCountBlock,\
PayloadBlock, PreviousNodeBlock, PrimaryBlock, ReasonCode, StatusCode, dtn_start, print_hex PayloadBlock, PreviousNodeBlock, PrimaryBlock, ReasonCode, StatusCode, dtn_start, print_hex
primary_block = PrimaryBlock( primary_block = PrimaryBlock(
......
from bundle_protocol.misc import unix2dtn, dtn2unix, serialize_bundle, NULL_EID, serialize_upcn_config_message from ..bundle_protocol.misc import unix2dtn, dtn2unix, serialize_bundle, NULL_EID, serialize_upcn_config_message
from bundle_protocol.convergence_layers.tcp_cl import TCPCLConnection from ..bundle_protocol.convergence_layers.tcp_cl import TCPCLConnection
from encoding.sdnv import sdnv_encode, sdnv_decode from ..encoding.sdnv import sdnv_encode, sdnv_decode
class TestCommon: class TestCommon:
......
from .pydtn_rest import start
import logging import logging
from aiohttp import web from aiohttp import web
from bundle_protocol import bp_agent from pydtn.bundle_protocol import bp_agent
from pydtn.bundle_protocol.bp_node import get_bp_node_id
logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s', level=logging.DEBUG)
our_eid = None our_eid = None
bp_app = None bp_client = None
upcn_tcp_endpoint = None upcn_tcp_endpoint = None
routes = web.RouteTableDef() routes = web.RouteTableDef()
...@@ -36,28 +42,17 @@ async def handle_bundle(request): ...@@ -36,28 +42,17 @@ async def handle_bundle(request):
destination = request.rel_url.query['destination'] destination = request.rel_url.query['destination']
await bp_app.send_bundle_msg(destination, data) await bp_client.send_bundle_msg(destination, data)
return web.json_response({'success': True}) return web.json_response({'success': True})
async def handle(request): def start(port, host='0.0.0.0'):
name = request.match_info.get('name', "Anonymous") global bp_client, our_eid
text = "Hello, " + name our_eid = f'{get_bp_node_id()}/web'
return web.Response(text=text) bp_client = bp_agent.register(our_eid, None)
async def start(args):
global bp_app, our_eid
our_eid = f'{args.node}/web'
bp_app = bp_agent.register(our_eid, None)
web_app = web.Application() app = web.Application()
web_app.add_routes(routes) app.add_routes(routes)
runner = web.AppRunner(web_app) web.run_app(app, host=host, port=port)
await runner.setup() logging.info(f'REST endpoint started on {host}:{port}')
host = '0.0.0.0'
port = args.http_port
site = web.TCPSite(runner, host=host, port=port)
await site.start()
logging.info(f'Web server started on {host}:{port}')
import logging
from bundle_protocol import bp_agent
from bundle_protocol.bp import EndOfData
our_eid = None
bp_app = None
async def recv_callback(ctx, src_eid):
global our_eid
logging.info(f'BP client "{our_eid}" receiving ADU from "{src_eid}"')
while True:
try:
data = await bp_agent.receive(ctx)
except EndOfData:
break
print(data.decode('ascii'), end='', flush=True)
print()
def start(args):
global bp_app, our_eid
our_eid = args.node
bp_app = bp_agent.register(our_eid, recv_callback)
cbor~=1.0.0
cbor2~=4.1.2
aiohttp~=3.5.4
import asyncio
async def run_concurrently(*args):
await asyncio.wait([asyncio.create_task(coro) for coro in args])
def async_run(*args, **kwargs):
asyncio.run(*args, **kwargs)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment