You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
4.7 KiB

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright 2020, Nils Hilbricht, Germany ( https://www.hilbricht.net )
This file is part of the Laborejo Software Suite ( https://www.laborejo.org ),
5 years ago
This application is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging; logger = logging.getLogger(__name__); logger.info("import")
import os, socket, asyncio
from signal import signal, SIGTERM, SIGUSR1
from threading import Thread
from sys import argv
from .nsmclient import _IncomingMessage, _OutgoingMessage
class NSMProtocol(asyncio.DatagramProtocol):
directory = None
addr = None #a cache
def __init__(self):
super().__init__()
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
NSMProtocol.addr = addr
msg = _IncomingMessage(data)
if msg.oscpath == "/nsm/server/announce":
application_name, capabilities, executable_name, api_version_major, api_version_minor, pid = msg.params
NSMProtocol.pid = pid
reply = _OutgoingMessage("/reply")
reply.add_arg("/nsm/server/announce")
reply.add_arg("Welcome!")
reply.add_arg("Fake Save Server")
reply.add_arg("server-control:")
self.send(reply, addr)
#['/home/user/NSM Sessions/dev-example/QtCboxNsm Exämple ツ.nXDBM', 'dev-example', 'QtCboxNsm Exämple ツ.nXDBM']
openMsg = reply = _OutgoingMessage("/nsm/client/open")
openMsg.add_arg(NSMProtocol.directory)
openMsg.add_arg("NOT-A-SESSION")
openMsg.add_arg(application_name)
self.send(openMsg, addr)
self.send(_OutgoingMessage("/nsm/client/show_optional_gui"), addr)
elif msg.oscpath == "/nsm/gui/client/save":
self.send(_OutgoingMessage("/nsm/client/save"), addr)
elif msg.oscpath == "/nsm/client/gui_is_hidden":
os.kill(NSMProtocol.pid, SIGTERM)
elif msg.oscpath == "/nsm/server/stop":
os.kill(NSMProtocol.pid, SIGTERM)
#else:
# print (msg.oscpath, msg.params)
def send(self, message, addr):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
sock.sendto(message.build(), addr)
@staticmethod
def staticSave(*args):
NSMProtocol.send(None, _OutgoingMessage("/nsm/client/save"), NSMProtocol.addr)
def startSingleNSMServer(directory):
"""Set all paths like NSM would receive them and nsmclient.py expects them."""
serverSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serverSock.bind(('', 0)) # Bind to a free port provided by the host.
SERVER_PORT = serverSock.getsockname()[1]
NSMProtocol.directory = directory
serverSock.close()
os.environ["NSM_URL"] = f"osc.udp://localhost:{SERVER_PORT}/"
executableName = os.path.basename(argv[0])
executableDir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
##assert os.path.exists(os.path.join(executableDir, executableName)) not valid anymore with zipapp. But it worked for years, so I guess the code is ok.
argv[0] = os.path.join(executableDir, executableName) #NSM speciality. nsmclient exlicitely checks for this
os.environ["PATH"] = os.environ["PATH"] + ":" + executableDir
#print (argv[0])
#print (executableName)
#print (executableDir)
#print(os.environ["PATH"])
#print(os.environ["NSM_URL"])
#loop = asyncio.get_event_loop()
#loop.create_task(asyncio.start_server(handle_client, 'localhost', SERVER_PORT))
#loop.run_forever()
#asyncio.run(asyncio.start_server(handle_client, 'localhost', SERVER_PORT))
logger.info(f"Starting fake NSM server on port {SERVER_PORT}")
#For Carla:
signal(SIGUSR1, NSMProtocol.staticSave)
loop = asyncio.get_event_loop()
def run_loop(loop):
asyncio.set_event_loop(loop)
t = loop.create_datagram_endpoint(NSMProtocol,local_addr=('127.0.0.1',SERVER_PORT), family=socket.AF_INET)
loop.run_until_complete(t)
loop.run_forever()
Thread(target=lambda: run_loop(loop), daemon=True).start() #Daemon makes the thread just stop when main thread ends.