diff --git a/sample_apps/advanced_example_py/advanced_example1.py b/sample_apps/advanced_example_py/advanced_example1.py index 98f7eb94..8c0765e1 100644 --- a/sample_apps/advanced_example_py/advanced_example1.py +++ b/sample_apps/advanced_example_py/advanced_example1.py @@ -1,9 +1,9 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. - import os import sys import ctypes +from dataclasses import dataclass JRTC_APP_PATH = os.environ.get("JRTC_APP_PATH") if JRTC_APP_PATH is None: @@ -21,18 +21,13 @@ ########################################################################## # Define the state variables for the application -class AppStateVars(ctypes.Structure): - _fields_ = [ - ("app", ctypes.POINTER(JrtcApp)), - - # add custom fields below - ("agg_cnt", ctypes.c_int32) - ] - +@dataclass +class AppStateVars: + app: JrtcApp + agg_cnt: int ########################################################################## -# Handler callback function (this function gets called by the C library) -def app_handler(timeout: bool, stream_idx: int, data_entry_ptr: ctypes.POINTER(struct_jrtc_router_data_entry), state_ptr: int): +def app_handler(timeout: bool, stream_idx: int, data_entry: struct_jrtc_router_data_entry, state: AppStateVars): GENERATOR_OUT_STREAM_IDX = 0 APP2_OUT_STREAM_IDX = 1 @@ -45,10 +40,6 @@ def app_handler(timeout: bool, stream_idx: int, data_entry_ptr: ctypes.POINTER(s else: - # Dereference the pointer arguments - state = ctypes.cast(state_ptr, ctypes.POINTER(AppStateVars)).contents - data_entry = data_entry_ptr.contents - if stream_idx == GENERATOR_OUT_STREAM_IDX: # Extract data from the received entry data = ctypes.cast(data_entry.data, ctypes.POINTER(example_msg)).contents @@ -130,7 +121,7 @@ def jrtc_start_app(capsule): ) # Initialize the app - state = AppStateVars(agg_cnt=0) + state = AppStateVars(agg_cnt=0, app=None) state.app = jrtc_app_create(capsule, app_cfg, app_handler, state) # run the app - This is blocking until the app exits diff --git a/sample_apps/advanced_example_py/advanced_example2.py b/sample_apps/advanced_example_py/advanced_example2.py index b34c4916..ae957043 100644 --- a/sample_apps/advanced_example_py/advanced_example2.py +++ b/sample_apps/advanced_example_py/advanced_example2.py @@ -1,9 +1,9 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. - import os import sys import ctypes +from dataclasses import dataclass JRTC_APP_PATH = os.environ.get("JRTC_APP_PATH") if JRTC_APP_PATH is None: @@ -21,18 +21,14 @@ ########################################################################## # Define the state variables for the application -class AppStateVars(ctypes.Structure): - _fields_ = [ - ("app", ctypes.POINTER(JrtcApp)), - - # add custom fields below - ("agg_cnt", ctypes.c_int32), - ("received_counter", ctypes.c_int) - ] +@dataclass +class AppStateVars: + app: JrtcApp + agg_cnt: int + received_counter: int ########################################################################## -# Handler callback function (this function gets called by the C library) -def app_handler(timeout: bool, stream_idx: int, data_entry_ptr: ctypes.POINTER(struct_jrtc_router_data_entry), state_ptr: int): +def app_handler(timeout: bool, stream_idx: int, data_entry: struct_jrtc_router_data_entry, state: AppStateVars): GENERATOR_PB_OUT_STREAM_IDX = 0 APP2_OUT_STREAM_IDX = 1 @@ -43,9 +39,6 @@ def app_handler(timeout: bool, stream_idx: int, data_entry_ptr: ctypes.POINTER(s pass else: - # Dereference the pointer arguments - state = ctypes.cast(state_ptr, ctypes.POINTER(AppStateVars)).contents - data_entry = data_entry_ptr.contents if stream_idx == GENERATOR_PB_OUT_STREAM_IDX: @@ -135,7 +128,7 @@ def jrtc_start_app(capsule): ) # Initialize the app - state = AppStateVars(agg_cnt=0, received_counter=0) + state = AppStateVars(agg_cnt=0, received_counter=0, app=None) state.app = jrtc_app_create(capsule, app_cfg, app_handler, state) # run the app - This is blocking until the app exists diff --git a/sample_apps/first_example_py/first_example.py b/sample_apps/first_example_py/first_example.py index 978d6a66..646a63a9 100644 --- a/sample_apps/first_example_py/first_example.py +++ b/sample_apps/first_example_py/first_example.py @@ -1,9 +1,9 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. - import os import sys import ctypes +from dataclasses import dataclass JRTC_APP_PATH = os.environ.get("JRTC_APP_PATH") if JRTC_APP_PATH is None: @@ -17,22 +17,20 @@ simple_input = sys.modules.get('simple_input') from generated_data import example_msg from simple_input import simple_input +from jrtc_bindings import ( + struct_jrtc_router_data_entry, +) ########################################################################## # Define the state variables for the application -class AppStateVars(ctypes.Structure): - _fields_ = [ - ("app", ctypes.POINTER(JrtcApp)), - - # add custom fields below - ("agg_cnt", ctypes.c_int), - ("received_counter", ctypes.c_int) - ] - +@dataclass +class AppStateVars: + app: JrtcApp + agg_cnt: int + received_counter: int ########################################################################## -# Handler callback function (this function gets called by the C library) -def app_handler(timeout: bool, stream_idx: int, data_entry_ptr: ctypes.POINTER(struct_jrtc_router_data_entry), state_ptr: int): +def app_handler(timeout: bool, stream_idx: int, data_entry: struct_jrtc_router_data_entry, state: AppStateVars): GENERATOR_OUT_STREAM_IDX = 0 SIMPLE_INPUT_IN_STREAM_IDX = 1 @@ -43,10 +41,6 @@ def app_handler(timeout: bool, stream_idx: int, data_entry_ptr: ctypes.POINTER(s else: - # Dereference the pointer arguments - state = ctypes.cast(state_ptr, ctypes.POINTER(AppStateVars)).contents - data_entry = data_entry_ptr.contents - if stream_idx == GENERATOR_OUT_STREAM_IDX: state.received_counter += 1 @@ -112,7 +106,7 @@ def jrtc_start_app(capsule): ) # Initialize the app - state = AppStateVars(agg_cnt=0, received_counter=0) + state = AppStateVars(agg_cnt=0, received_counter=0, app=None) state.app = jrtc_app_create(capsule, app_cfg, app_handler, state) # run the app - This is blocking until the app exists diff --git a/src/pythonapp_loader/jrtc_pythonapp_loader.c b/src/pythonapp_loader/jrtc_pythonapp_loader.c index 8c5d5cf1..8883d0a7 100644 --- a/src/pythonapp_loader/jrtc_pythonapp_loader.c +++ b/src/pythonapp_loader/jrtc_pythonapp_loader.c @@ -360,7 +360,6 @@ jrtc_start_app(void* args) Py_XDECREF(pCapsule); cleanup_gil: - PyGILState_Release(gstate); if (ts1) { if (main_ts != ts1) { @@ -371,6 +370,7 @@ jrtc_start_app(void* args) PyThreadState_Swap(main_ts); Py_EndInterpreter(ts1); } + PyGILState_Release(gstate); if (Py_IsInitialized()) { Py_Finalize(); diff --git a/src/wrapper_apis/python/jrtc_app.py b/src/wrapper_apis/python/jrtc_app.py index 2b5d8a81..843e0158 100644 --- a/src/wrapper_apis/python/jrtc_app.py +++ b/src/wrapper_apis/python/jrtc_app.py @@ -1,14 +1,17 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. - import time import os import sys import ctypes -from ctypes import c_int, c_float, c_char_p, c_void_p, c_bool from dataclasses import dataclass -from typing import Any +from ctypes import ( + c_int, c_float, c_char_p, c_void_p, c_bool, + POINTER, Structure, c_uint16, c_uint64 +) +from dataclasses import dataclass +from typing import Any, Optional # Retrieve the JRTC application path from environment variables JRTC_APP_PATH = os.environ.get("JRTC_APP_PATH") @@ -16,163 +19,251 @@ print("Warning: JRTC_APP_PATH not set") JRTC_APP_PATH = "./" sys.path.append(f"{JRTC_APP_PATH}") -from jrtc_bindings import * -# Import necessary JRTC modules -from jrtc_router_stream_id import ( - JrtcRouterStreamId, - jrtc_router_stream_id_matches_req +from jrtc_bindings import * +from jrtc_router_stream_id import JrtcRouterStreamId, jrtc_router_stream_id_matches_req +from jrtc_wrapper_utils import ( + JrtcAppEnv, + get_ctx_from_capsule, + get_data_entry_array_ptr, ) -from jrtc_wrapper_utils import JrtcAppEnv, get_ctx_from_capsule, get_data_entry_array_ptr from jrtc_router_lib import ( jrtc_router_channel_register_stream_id_req, jrtc_router_channel_deregister_stream_id_req, jrtc_router_receive, - jrtc_router_channel_send_input_msg, + jrtc_router_channel_send_input_msg, jrtc_router_channel_send_output_msg, - jrtc_router_channel_create, - jrtc_router_channel_destroy, + jrtc_router_channel_create, + jrtc_router_channel_destroy, jrtc_router_channel_release_buf, jrtc_router_input_channel_exists, JRTC_ROUTER_REQ_DEST_ANY, JRTC_ROUTER_REQ_DEVICE_ID_ANY, JRTC_ROUTER_REQ_DEST_NONE, JRTC_ROUTER_REQ_STREAM_PATH_ANY, - JRTC_ROUTER_REQ_STREAM_NAME_ANY + JRTC_ROUTER_REQ_STREAM_NAME_ANY, ) - - -# Load the shared C library -lib = ctypes.CDLL(f'{JRTC_APP_PATH}/libjrtc_app.so') - -# Define C structures (using ctypes) - -class JrtcApp(ctypes.Structure): - pass - -class JrtcStreamIdCfg_t(ctypes.Structure): +# Structs +class JrtcStreamIdCfg_t(Structure): _fields_ = [ ("destination", c_int), ("device_id", c_int), ("stream_source", c_char_p), - ("io_map", c_char_p) + ("io_map", c_char_p), ] -class JrtcAppChannelCfg_t(ctypes.Structure): - _fields_ = [ - ("is_output", c_bool), - ("num_elems", c_int), - ("elem_size", c_int) - ] +class JrtcAppChannelCfg_t(Structure): + _fields_ = [("is_output", c_bool), ("num_elems", c_int), ("elem_size", c_int)] -class JrtcStreamCfg_t(ctypes.Structure): +class JrtcStreamCfg_t(Structure): _fields_ = [ ("sid", JrtcStreamIdCfg_t), ("is_rx", c_bool), - ("appChannel", ctypes.POINTER(JrtcAppChannelCfg_t)) + ("appChannel", POINTER(JrtcAppChannelCfg_t)), ] -class JrtcAppCfg_t(ctypes.Structure): +class JrtcAppCfg_t(Structure): _fields_ = [ ("context", c_char_p), ("q_size", c_int), ("num_streams", c_int), - ("streams", ctypes.POINTER(JrtcStreamCfg_t)), + ("streams", POINTER(JrtcStreamCfg_t)), ("initialization_timeout_secs", c_float), ("sleep_timeout_secs", c_float), - ("inactivity_timeout_secs", c_float) + ("inactivity_timeout_secs", c_float), ] -# Callback type definition for the handler function - -JrtcAppHandler = ctypes.CFUNCTYPE(None, c_bool, c_int, ctypes.POINTER(struct_jrtc_router_data_entry), c_void_p) - -# Function prototypes - -lib.jrtc_app_create.argtypes = [ctypes.POINTER(JrtcAppEnv), ctypes.POINTER(JrtcAppCfg_t), JrtcAppHandler, c_void_p] -lib.jrtc_app_create.restype = ctypes.POINTER(JrtcApp) - -#lib.jrtc_app_destroy.argtypes = [c_void_p] -lib.jrtc_app_destroy.argtypes = [ctypes.POINTER(JrtcApp)] -lib.jrtc_app_destroy.restype = None - -lib.jrtc_app_run.argtypes = [ctypes.POINTER(JrtcApp)] -lib.jrtc_app_run.restype = None - -lib.jrtc_app_router_channel_send_input_msg.argtypes = [ctypes.POINTER(JrtcApp), c_int, c_void_p, c_int] -lib.jrtc_app_router_channel_send_input_msg.restype = c_int - -lib.jrtc_app_router_channel_send_output_msg.argtypes = [ctypes.POINTER(JrtcApp), c_int, c_void_p, c_int] -lib.jrtc_app_router_channel_send_output_msg.restype = c_int - - - -####################################################################### -# Python wrapper to call jrtc_app_create C function -def jrtc_app_create(capsule, app_cfg: JrtcAppCfg_t, app_handler: JrtcAppHandler, app_state: Any) -> None : - +class ChannelCtx(ctypes.Structure): pass +class AppStateVars: pass + +class StreamItem(ctypes.Structure): + sid: Optional[JrtcRouterStreamId] = None + registered: bool = False + chan_ctx: Optional[ChannelCtx] = None + +@dataclass +class JrtcAppData: + env_ctx: JrtcAppEnv + app_cfg: JrtcAppCfg_t + app_handler: Any + app_state: AppStateVars + last_received_time: float + def __init__(self, env_ctx, app_cfg, app_handler, app_state, last_received_time): + self.env_ctx = env_ctx + self.app_cfg = app_cfg + self.app_handler = app_handler + self.app_state = app_state + self.last_received_time = last_received_time + +class JrtcApp: + def __init__(self, env_ctx, app_cfg, app_handler, app_state): + super().__init__() + self.data = JrtcAppData(env_ctx, app_cfg, app_handler, app_state, time.monotonic()) + self.stream_items: list[StreamItem] = [] + + def init(self) -> int: + start_time = time.monotonic() + self.last_received_time = start_time + + for i in range(self.data.app_cfg.num_streams): + stream = self.data.app_cfg.streams[i] + si = StreamItem() + si.sid = JrtcRouterStreamId() + + res = si.sid.generate_id( + stream.sid.destination, + stream.sid.device_id, + stream.sid.stream_source, + stream.sid.io_map, + ) + if res != 1: + print(f"{self.data.app_cfg.context}:: Failed to generate stream ID for stream {i}") + return -1 + _sid = si.sid + si.sid = si.sid.convert_to_struct_jrtc_router_stream_id() + + if stream.appChannel: + si.chan_ctx = jrtc_router_channel_create( + self.data.env_ctx.dapp_ctx, + stream.appChannel.contents.is_output, + stream.appChannel.contents.num_elems, + stream.appChannel.contents.elem_size, + _sid, + None, + 0, + ) + if not si.chan_ctx: + print(f"{self.data.app_cfg.context}:: Failed to create channel for stream {i}") + return -1 + + if stream.is_rx: + if not jrtc_router_channel_register_stream_id_req(self.data.env_ctx.dapp_ctx, si.sid): + print(f"{self.data.app_cfg.context}:: Failed to register stream {i}") + return -1 + si.registered = True + + self.stream_items.append(si) + + if self.data.app_cfg.initialization_timeout_secs > 0: + if time.monotonic() - start_time > self.data.app_cfg.initialization_timeout_secs: + print(f"{self.data.app_cfg.context}:: Initialization timeout") + return -1 + + for i in range(self.data.app_cfg.num_streams): + stream = self.data.app_cfg.streams[i] + si = self.stream_items[i] + if not stream.is_rx and not si.chan_ctx: + k = 0 + while not jrtc_router_input_channel_exists(si.sid): + time.sleep(0.1) + if k == 10: + print(f"{self.data.app_cfg.context}:: Waiting for creation of stream {i}") + k = 0 + else: + k += 1 + + if self.data.app_cfg.initialization_timeout_secs > 0: + if time.monotonic() - start_time > self.data.app_cfg.initialization_timeout_secs: + print(f"{self.data.app_cfg.context}:: Timeout waiting for stream {i}") + return -1 + return 0 + + def cleanup(self): + print(f"{self.data.app_cfg.context}:: Cleaning up app") + for si in self.stream_items: + if si.registered: + jrtc_router_channel_deregister_stream_id_req(self.data.env_ctx.dapp_ctx, si.sid) + if si.chan_ctx: + jrtc_router_channel_destroy(si.chan_ctx) + + def run(self): + if self.init() != 0: + return + + data_entries = get_data_entry_array_ptr(self.data.app_cfg.q_size) + while not self.data.env_ctx.app_exit: + now = time.monotonic() + if ( + self.data.app_cfg.inactivity_timeout_secs > 0 + and now - self.data.last_received_time > self.data.app_cfg.inactivity_timeout_secs + ): + self.data.app_handler(True, -1, None, self.data.app_state) + self.last_received_time = now + + num_rcv = jrtc_router_receive(self.data.env_ctx.dapp_ctx, data_entries, self.data.app_cfg.q_size) + for i in range(num_rcv): + data_entry = data_entries[i] + if not data_entry: + continue + for sidx in range(self.data.app_cfg.num_streams): + stream = self.data.app_cfg.streams[sidx] + si = self.stream_items[sidx] + if stream.is_rx and jrtc_router_stream_id_matches_req(data_entry.stream_id, si.sid): + self.data.app_handler(False, sidx, data_entry, self.data.app_state) + break + jrtc_router_channel_release_buf(data_entry.data) + self.data.last_received_time = time.monotonic() + + if self.data.app_cfg.sleep_timeout_secs > 0: + time.sleep(max(self.data.app_cfg.sleep_timeout_secs, 1e-9)) + + self.cleanup() + + def get_stream(self, stream_idx: int) -> Optional[JrtcRouterStreamId]: + if stream_idx < 0 or stream_idx >= len(self.stream_items): + return + return self.stream_items[stream_idx].sid + + def get_chan_ctx(self, stream_idx: int) -> Optional[ChannelCtx]: + if stream_idx < 0 or stream_idx >= len(self.stream_items): + return + return self.stream_items[stream_idx].chan_ctx + +def jrtc_app_create(capsule, app_cfg: JrtcAppCfg_t, app_handler, app_state): env_ctx = get_ctx_from_capsule(capsule) - app_handler_c = JrtcAppHandler(app_handler) - - print("app_handler_c = ", app_handler_c) - app_handler_raw = ctypes.cast(app_handler_c, ctypes.c_void_p) - - return lib.jrtc_app_create( - ctypes.byref(env_ctx), - ctypes.byref(app_cfg), - app_handler_c, - ctypes.pointer(app_state)) - - -####################################################################### -# Python wrapper to call jrtc_app_run C function -def jrtc_app_run(app: ctypes.POINTER(JrtcApp)) -> None: - lib.jrtc_app_run(app) - - -####################################################################### -# Python wrapper to call jrtc_app_destroy -def jrtc_app_destroy(app: ctypes.POINTER(JrtcApp)) -> None: - lib.jrtc_app_destroy(app) - - -####################################################################### -# Python wrapper to call jrtc_router_channel_send_input_msg -def jrtc_app_router_channel_send_input_msg(app: ctypes.POINTER(JrtcApp), stream_idx: int, data: bytes, data_len: int) -> int: - return lib.jrtc_app_router_channel_send_input_msg(app, stream_idx, data, data_len) - - -####################################################################### -# Python wrapper to call jrtc_router_channel_send_input_msg -def jrtc_app_router_channel_send_output_msg(app: ctypes.POINTER(JrtcApp), stream_idx: int, data: bytes, data_len: int) -> int: - return lib.jrtc_app_router_channel_send_output_msg(app, stream_idx, data, data_len) - - - - - -######################################################## -# Lists of items to expose when this module is importd -######################################################## -__all__ = [ - 'JRTC_ROUTER_REQ_DEST_ANY', - 'JRTC_ROUTER_REQ_DEVICE_ID_ANY', - 'JRTC_ROUTER_REQ_DEST_NONE', - 'JRTC_ROUTER_REQ_STREAM_PATH_ANY', - 'JRTC_ROUTER_REQ_STREAM_NAME_ANY', - 'struct_jrtc_router_data_entry', - - 'JrtcStreamIdCfg_t', - 'JrtcAppChannelCfg_t', - 'JrtcStreamCfg_t', - 'JrtcAppCfg_t', - - 'JrtcApp', - 'jrtc_app_create', - 'jrtc_app_run', - 'jrtc_app_destroy', - 'jrtc_app_router_channel_send_input_msg', - 'jrtc_app_router_channel_send_output_msg', - - ] + app_instance = JrtcApp( + env_ctx=env_ctx, + app_cfg=app_cfg, + app_handler=app_handler, + app_state=app_state, + ) + return app_instance + +def jrtc_app_run(app) -> None: + app.run() + +def jrtc_app_destroy(app) -> None: + app.cleanup() + del app + +def jrtc_app_router_channel_send_input_msg(app: JrtcApp, stream_idx: int, data: bytes, data_len: int) -> int: + stream = app.get_stream(stream_idx) + if not stream: + return -1 + return jrtc_router_channel_send_input_msg(stream, data, data_len) + +def jrtc_app_router_channel_send_output_msg(app: JrtcApp, stream_idx: int, data: bytes, data_len: int) -> int: + chan_ctx = app.get_chan_ctx(stream_idx) + if not chan_ctx: + return -1 + return jrtc_router_channel_send_output_msg(chan_ctx, data, data_len) + +__all__ = [ + "JRTC_ROUTER_REQ_DEST_ANY", + "JRTC_ROUTER_REQ_DEVICE_ID_ANY", + "JRTC_ROUTER_REQ_DEST_NONE", + "JRTC_ROUTER_REQ_STREAM_PATH_ANY", + "JRTC_ROUTER_REQ_STREAM_NAME_ANY", + "struct_jrtc_router_data_entry", + "JrtcStreamIdCfg_t", + "JrtcAppChannelCfg_t", + "JrtcStreamCfg_t", + "JrtcAppCfg_t", + "JrtcApp", + "jrtc_app_create", + "jrtc_app_run", + "jrtc_app_destroy", + "jrtc_app_router_channel_send_input_msg", + "jrtc_app_router_channel_send_output_msg", +] diff --git a/src/wrapper_apis/python/jrtc_router_stream_id.py b/src/wrapper_apis/python/jrtc_router_stream_id.py index 6d0c3b22..d3066e5f 100644 --- a/src/wrapper_apis/python/jrtc_router_stream_id.py +++ b/src/wrapper_apis/python/jrtc_router_stream_id.py @@ -53,10 +53,8 @@ def generate_id(self, fwd_dst, device_id, stream_path, stream_name): b_stream_path = None b_stream_name = None if stream_path != None: - stream_path = stream_path.encode("utf-8") b_stream_path = ctypes.create_string_buffer(stream_path) if stream_name != None: - stream_name = stream_name.encode("utf-8") b_stream_name = ctypes.create_string_buffer(stream_name) res = stream_id_lib.jrtc_router_generate_stream_id(