Commit a4c1fce3 authored by Juraj Sloboda's avatar Juraj Sloboda

Code review and cleanup

- Rename: HOWTO.txt -> DEMO_HOWTO.txt
- Fix invalid dir in DEMO_HOWTO.txt
- pip: specify versions of required packages
- Use requirements.txt
- Remove support for threads and asyncio examples
- Remove unnecessary 'run_concurrently' statement
- Fix deprecated use of asyncio.wait
parent bbeff3a0
......@@ -22,7 +22,7 @@ docker logs -f docker_uPCN-1_1
# Send bundle
# Alternatively, you can use Postman and import docker/REST/pyDTN.postman_collection.json
cd tools/schedule_contact/
cd tools/send_bundle/
./send.sh
# List running containers including commands running inside
......
......@@ -12,7 +12,7 @@ This is the PoC (Proof of Concept) implementation of:
* [DTN IP Neighbor Discovery (IPND)](https://tools.ietf.org/html/draft-irtf-dtnrg-ipnd-03)
* Send/receive a Beacon, code exists, integration in progress, see %2 milestone
* Send/receive a beacon, code exists, integration in progress, see %2 milestone
# Use Case
......@@ -33,4 +33,4 @@ This is the PoC (Proof of Concept) implementation of:
Issues listed above will be solved as a part of %2 milestone.
# Try it yourself
See [HOWTO.txt](HOWTO.txt) (eventually will be integrated into this file)
See [DEMO_HOWTO.txt](DEMO_HOWTO.txt) (eventually will be integrated into this file)
......@@ -4,7 +4,8 @@ FROM python:3
# Update aptitude with new repo
RUN apt-get update
RUN apt-get install -y python3-pip joe tcpdump
RUN pip install cbor cbor2 aiohttp
COPY requirements.txt /
RUN pip install -r /requirements.txt
WORKDIR /pyDTN
......
cbor~=1.0.0
cbor2~=4.1.2
aiohttp~=3.5.4
......@@ -19,7 +19,7 @@ async def recv_callback(ctx, src_eid):
print()
async def start(args):
def start(args):
global bp_app, our_eid
our_eid = args.node
bp_app = bp_agent.register(our_eid, recv_callback)
async def run_forever(args):
pass
......@@ -4,10 +4,9 @@ import logging
from bundle_protocol import bp_agent
from bundle_protocol.bp_node import set_bp_node_id
from bundle_protocol.convergence_layers.tcp_cl import TCPCLAdapter, RemoteTCPEndpoint
from ipnd import ipnd
from httpd import httpd
from app import app
from util.concurrency import run_concurrently, async_run
from util.concurrency import async_run
logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s', level=logging.DEBUG)
......@@ -25,6 +24,7 @@ async def main():
set_bp_node_id(args.node)
# Initialize TCP convergence layer adapter
tcp_cla = TCPCLAdapter(port=args.port)
bp_agent.register_cl_adapter(tcp_cla)
......@@ -34,12 +34,14 @@ async def main():
# Send all traffic through uPCN node
bp_agent.add_route(None, args.upcn_eid)
await run_concurrently(
bp_agent.run_forever(args),
ipnd.run_forever(args),
httpd.start(args),
app.start(args)
)
# Start example application that prints received bundle payloads
app.start(args)
# Start REST endpoint
await httpd.start(args)
# Start Bundle protocol server
await bp_agent.run_forever(args)
if __name__ == '__main__':
......
cbor~=1.0.0
cbor2~=4.1.2
aiohttp~=3.5.4
import asyncio
from threading import Thread
def thread_v1(fun):
"""Function decorator
Return a new function that
instead of executing the body of the original function
saves the arguments and returns a Thread object
When the thread is started it calls the original function with those arguments
This is to make it similar to coroutine functions (the ones with 'async' keyword)
which when supplied with arguments return coroutine objects
which can be executed using 'await' keyword
"""
def wrapper(*args):
return Thread(target=fun, args=args)
return wrapper
def thread(fun):
"""Function decorator
Return a coroutine function which executes the original function in thread executor
"""
async def cor(*args):
await asyncio.get_event_loop().run_in_executor(None, fun, *args)
return cor
# Example:
#
#
# async def run_function1(a):
# await function1()
#
# async def run_function2():
# await function2()
#
# @concurrency.thread
# def run_function3(a, b):
# function3(a, b)
#
# async def main():
# await run_concurrently(
# run_function1(3),
# run_function2(),
# run_function3(2, 7),
# )
async def run_concurrently(*args):
await asyncio.wait([*args])
# Example:
#
#
# async def main(param_1, .., param_n):
# ...
#
# async_run(main(arg_1, .., arg_n))
# Event loop running in the main thread
_running_loop = None
def async_run(cor, debug=False):
"""Run coroutine in event loop
Create event loop and save a reference to in to be used in async_run_in_main_thread()
Should be called only from the main thread
Modelled after asyncio.run
"""
global _running_loop
try:
asyncio.get_running_loop()
except RuntimeError:
pass
else:
raise RuntimeError("async_run() cannot be called from a running event loop")
if not asyncio.iscoroutine(cor):
raise ValueError("a coroutine was expected, got {!r}".format(cor))
_running_loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(_running_loop)
_running_loop.set_debug(debug)
return _running_loop.run_until_complete(cor)
finally:
asyncio.set_event_loop(None)
_running_loop.close()
await asyncio.wait([asyncio.create_task(coro) for coro in args])
def async_run_in_main_thread(cor):
return asyncio.run_coroutine_threadsafe(cor, _running_loop).result()
def async_run(*args, **kwargs):
asyncio.run(*args, **kwargs)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment