You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

197 lines
6.8 KiB

from ctypes import *
from ctypes.util import find_library
import numbers
import os
import logging
def cbox_uuid_to_str(uuid_ptr):
uuid_str = create_string_buffer(40)
cb.cbox_uuid_tostring(uuid_ptr, uuid_str)
return uuid_str.value.decode()
c_int32_p = POINTER(c_int32)
c_double_p = POINTER(c_double)
GQuark = c_int32
class GError(Structure):
_fields_ = [
( 'domain', GQuark ),
( 'code', c_int ),
( 'message', c_char_p ),
]
GErrorPtr = POINTER(GError)
GErrorPtrPtr = POINTER(GErrorPtr)
class OscCommand(Structure):
_fields_ = [
( 'command', c_char_p ),
( 'arg_types', c_char_p ),
( 'arg_values', POINTER(c_void_p) ),
]
class CboxBlob(Structure):
_fields_ = [
( 'data', c_void_p ),
( 'size', c_ulong ),
]
def bytes(self):
return string_at(self.data, self.size)
def __repr__(self):
return repr(self.bytes())
CboxBlobPtr = POINTER(CboxBlob)
class CboxObjHdr(Structure):
_fields_ = [
( 'class_ptr', c_void_p ),
( 'owner', c_void_p ),
( 'link_in_document', c_void_p ),
( 'cbox_uuid', c_ubyte * 16 ),
( 'stamp', c_uint64 ),
]
PROCESS_CMD_FUNC = CFUNCTYPE(c_bool, c_void_p, c_void_p, POINTER(OscCommand), GErrorPtrPtr)
class CmdTarget(Structure):
_fields_ = [
( 'user_data', c_void_p ),
( 'process_cmd', PROCESS_CMD_FUNC ),
]
CmdTargetPtr = POINTER(CmdTarget)
class PyCmdTarget(CmdTarget):
def __init__(self):
def process_cmd_func(cmd_target, fb_target, command, error_ptr_ptr):
cmd_target = CmdTargetPtr.from_address(cmd_target)
fb_target = CmdTargetPtr.from_address(fb_target) if fb_target else None
cmd = command.contents
argtypes = cmd.arg_types.decode()
data = cmd.arg_values
args = [None] * len(argtypes)
for i in range(len(argtypes)):
argtype = argtypes[i]
if argtype == 's':
args[i] = cast(data[i], c_char_p).value.decode()
elif argtype == 'i':
args[i] = cast(data[i], c_int32_p).contents.value
elif argtype == 'f':
args[i] = cast(data[i], c_double_p).contents.value
elif argtype == 'u':
args[i] = cbox_uuid_to_str(cast(data[i], c_void_p))
elif argtype == 'b':
args[i] = cast(data[i], CboxBlobPtr).contents.bytes()
elif argtype == 'o':
args[i] = cbox_uuid_to_str(cast(data[i], POINTER(CboxObjHdr)).contents.cbox_uuid)
#elif argtype == 'N':
# args[i] = None
try:
self.process(cmd.command.decode(), args)
return True
except Exception as e:
cb.g_error_set(1, 1, str(e))
return False
self.process_cmd = PROCESS_CMD_FUNC(process_cmd_func)
self.user_data = 0
def process(self, cmd, args):
print ("%s(%s)" % (cmd, repr(args)))
def find_calfbox():
if "CALFBOXLIBABSPATH" in os.environ:
logging.info("Searching for calfbox shared library with absolute path: %s", (os.environ["CALFBOXLIBABSPATH"]))
assert os.path.exists(os.environ["CALFBOXLIBABSPATH"]), os.environ["CALFBOXLIBABSPATH"]
cblib = os.environ["CALFBOXLIBABSPATH"]
elif "CALFBOXLIBFILENAME" in os.environ:
#This should be just name of the lib, not a filename and no "lib"-prefix. For example "calfbox" if the real name is libcalfbox.so.1
logging.info("Searching for calfbox shared library with just the filename: %s", (os.environ["CALFBOXLIBFILENAME"]))
cblib = find_library(os.environ["CALFBOXLIBFILENAME"])
else:
cblib = find_library('calfbox')
if cblib:
logging.info("Loading calfbox shared library: %s" % (cblib))
cb = cdll.LoadLibrary(cblib)
return cb
else:
raise FileNotFoundError("Calfbox shared library not found: %s" % (cblib))
cb = find_calfbox()
cb.cbox_embed_get_cmd_root.restype = CmdTargetPtr
class CalfboxException(Exception):
pass
def convert_exception(cls, gptr):
msg = gptr.contents.message
cb.g_error_free(gptr)
raise cls(msg.decode())
class WrapCmdTarget(PyCmdTarget):
def __init__(self, fb):
PyCmdTarget.__init__(self)
self.fb = fb
def process(self, cmd, args):
self.fb(cmd, None, args)
def init_engine(config=None):
gptr = GErrorPtr()
if not cb.cbox_embed_init_engine(config, byref(gptr)):
convert_exception(CalfboxException, gptr)
def start_audio(cmd_dumper=None):
gptr = GErrorPtr()
target = byref(WrapCmdTarget(cmd_dumper)) if cmd_dumper is not None else None
# XXXKF pass the callback
if not cb.cbox_embed_start_audio(target, byref(gptr)):
convert_exception(CalfboxException, gptr)
def stop_audio():
gptr = GErrorPtr()
if not cb.cbox_embed_stop_audio(byref(gptr)):
convert_exception(CalfboxException, gptr)
def shutdown_engine():
gptr = GErrorPtr()
if not cb.cbox_embed_shutdown_engine(byref(gptr)):
convert_exception(CalfboxException, gptr)
def do_cmd_on(target, cmd, fb, args):
gptr = GErrorPtr()
ocmd = OscCommand()
ocmd.command = cmd.encode()
acnt = len(args)
arg_types = create_string_buffer(acnt)
arg_values = (c_void_p * acnt)()
arg_space = (c_uint8 * (8 * acnt))()
# Scratch space for string encoding
tmp = []
for i in range(len(args)):
a = args[i]
t = type(a)
if isinstance(a, numbers.Number):
if t is float:
arg_types[i] = b'f'
arg_values[i] = addressof(arg_space) + 8 * i
cast(arg_values[i], c_double_p).contents.value = a
else:
arg_types[i] = b'i'
arg_values[i] = addressof(arg_space) + 8 * i
cast(arg_values[i], c_int32_p).contents.value = a
elif t == str:
tmp.append(create_string_buffer(a.encode()))
arg_types[i] = b's'
arg_values[i] = addressof(tmp[-1])
elif t == bytearray:
tmp.append(CboxBlob(cast((c_byte * len(a)).from_buffer(a), c_void_p), len(a)))
arg_types[i] = b'b'
arg_values[i] = addressof(tmp[-1])
else:
arg_types[i] = b'N'
ocmd.arg_types = cast(arg_types, c_char_p)
ocmd.arg_values = arg_values
if fb is not None:
res = target.contents.process_cmd(target, byref(WrapCmdTarget(fb)), ocmd, gptr)
else:
res = target.contents.process_cmd(target, None, ocmd, gptr)
if not res:
if gptr and gptr.contents:
raise Exception(gptr.contents.message.decode())
else:
raise Exception("Unknown error")
def do_cmd(cmd, fb, args):
do_cmd_on(cb.cbox_embed_get_cmd_root(), cmd, fb, args)