#! /usr/bin/env python3 # -*- coding: utf-8 -*- """ Copyright 2020, Nils Hilbricht, Germany ( https://www.hilbricht.net ) This file is part of the Laborejo Software Suite ( https://www.laborejo.org ), more specifically its template base application. The Template Base Application is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import logging; logger = logging.getLogger(__name__); logger.info("import") import os, socket, asyncio from signal import signal, SIGTERM, SIGUSR1 from threading import Thread from sys import argv from .nsmclient import _IncomingMessage, _OutgoingMessage class NSMProtocol(asyncio.DatagramProtocol): directory = None addr = None #a cache def __init__(self): super().__init__() def connection_made(self, transport): self.transport = transport def datagram_received(self, data, addr): NSMProtocol.addr = addr msg = _IncomingMessage(data) if msg.oscpath == "/nsm/server/announce": application_name, capabilities, executable_name, api_version_major, api_version_minor, pid = msg.params NSMProtocol.pid = pid reply = _OutgoingMessage("/reply") reply.add_arg("/nsm/server/announce") reply.add_arg("Welcome!") reply.add_arg("Fake Save Server") reply.add_arg("server-control:") self.send(reply, addr) #['/home/user/NSM Sessions/dev-example/QtCboxNsm Exämple ツ.nXDBM', 'dev-example', 'QtCboxNsm Exämple ツ.nXDBM'] openMsg = reply = _OutgoingMessage("/nsm/client/open") openMsg.add_arg(NSMProtocol.directory) openMsg.add_arg("NOT-A-SESSION") openMsg.add_arg(application_name) self.send(openMsg, addr) self.send(_OutgoingMessage("/nsm/client/show_optional_gui"), addr) elif msg.oscpath == "/nsm/gui/client/save": self.send(_OutgoingMessage("/nsm/client/save"), addr) elif msg.oscpath == "/nsm/client/gui_is_hidden": os.kill(NSMProtocol.pid, SIGTERM) elif msg.oscpath == "/nsm/server/stop": os.kill(NSMProtocol.pid, SIGTERM) #else: # print (msg.oscpath, msg.params) def send(self, message, addr): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP sock.sendto(message.build(), addr) @staticmethod def staticSave(*args): NSMProtocol.send(None, _OutgoingMessage("/nsm/client/save"), NSMProtocol.addr) def startSingleNSMServer(directory): """Set all paths like NSM would receive them and nsmclient.py expects them.""" serverSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) serverSock.bind(('', 0)) # Bind to a free port provided by the host. SERVER_PORT = serverSock.getsockname()[1] NSMProtocol.directory = directory serverSock.close() os.environ["NSM_URL"] = f"osc.udp://localhost:{SERVER_PORT}/" executableName = os.path.basename(argv[0]) executableDir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) assert os.path.exists(os.path.join(executableDir, executableName)) argv[0] = os.path.join(executableDir, executableName) #NSM speciality. nsmclient exlicitely checks for this os.environ["PATH"] = os.environ["PATH"] + ":" + executableDir #print (argv[0]) #print (executableName) #print (executableDir) #print(os.environ["PATH"]) #print(os.environ["NSM_URL"]) #loop = asyncio.get_event_loop() #loop.create_task(asyncio.start_server(handle_client, 'localhost', SERVER_PORT)) #loop.run_forever() #asyncio.run(asyncio.start_server(handle_client, 'localhost', SERVER_PORT)) logger.info(f"Starting fake NSM server on port {SERVER_PORT}") #For Carla: signal(SIGUSR1, NSMProtocol.staticSave) loop = asyncio.get_event_loop() def run_loop(loop): asyncio.set_event_loop(loop) t = loop.create_datagram_endpoint(NSMProtocol,local_addr=('127.0.0.1',SERVER_PORT), family=socket.AF_INET) loop.run_until_complete(t) loop.run_forever() Thread(target=lambda: run_loop(loop), daemon=True).start() #Daemon makes the thread just stop when main thread ends.