Forum Programmation.python Communication inter-processus

Posté par  . Licence CC By‑SA.
Étiquettes :
1
21
jan.
2017

Bonjour,

Je dois disposer d’un moyen de communiquer entre plusieurs processus. Typiquement entre un processus client interactif et un processus serveur en tâche de fond.

Le process client doit pouvoir non seulement envoyer les signaux standards (SIGINT, SIGTERM, etc…) mais aussi n’importe quel "message".

Avant d’essayer d’ajouter du code à mon projet j’ai fait un petit test afin de bien cerner le problème.

Je suis arrivé à la solution suivante, que je soumets à votre sagacité… Est-ce la bonne manière de faire. Comment procédez-vous vous même pour répondre à cette problématique ?

Je m’interroge aussi sur les constantes que j’ai choisies (65535 et 0.5), que j’interprète comme étant la taille maximum de mon "message" et le temps laissé pour la lecture du FIFO… Si je ne mets pas ce délai, cela résulte en un nombre de fichiers ouverts trop important (et ça plante, normal), vu que ça ouvre un autre fichier sans avoir eu le temps de finir de lire le premier…

J’ai pourtant besoin d’être en "non-blocking" si je veux passer à la suite…

Ces valeurs vous semblent correctes ?

#!/usr/bin/env python3

import sys
import subprocess
import signal
import os
from time import sleep

pidFilename = '/tmp/'+sys.argv[0]+'.pid'
fifoFilename = '/tmp/'+sys.argv[0]+'.fifo'

def graceful_exit(signum, frame):

    print('I terminate, my PID is '+str(os.getpid())+' and I received signal '+str(signum)+'', file=sys.stderr, flush=True)
    sys.exit(0)

try:

    if sys.argv[1] == 'start':

        try:

            os.unlink(fifoFilename)

        except FileNotFoundError:

            pass

        sleeper = subprocess.Popen(['./'+sys.argv[0], 'run'], stdin=subprocess.PIPE, shell=False)
        pidfile = open(pidFilename,'w')
        _pid = str(sleeper.pid)
        pidfile.write(_pid)
        pidfile.close()

    elif sys.argv[1] == 'run':

        signal.signal(signal.SIGTERM, graceful_exit)
        signal.signal(signal.SIGINT, graceful_exit)

        fifo = os.mkfifo(fifoFilename)

        while True:

            _message = os.read(os.open(fifoFilename, os.O_NONBLOCK | os.O_RDONLY), 65535).decode('UTF-8')

            if _message:

                print('I’m alive, my PID is '+str(os.getpid())+' and I’ve been told to say : « '+_message+' »', file=sys.stderr, flush=True)

            sleep(0.5)

    elif sys.argv[1] == 'stop':        

        pidfile = open(pidFilename,'r')
        os.kill(int(pidfile.read()), signal.SIGTERM)
        os.unlink(fifoFilename)
        pidfile.close()
        os.unlink(pidFilename)

    elif sys.argv[1] == 'say' and len(sys.argv) > 1:

        open(fifoFilename, 'w').write(sys.argv[2])

except IndexError as e:

    print(str(e), file=sys.stderr)

Le script s’utilise ainsi :

./sp.py start

démarre le "serveur"

./sp.py say <str>

fait écrire au "serveur" sur la sortie d’erreur

./sp.py stop

arrête le serveur.

Voilà, vous en pensez quoi ?

  • # Petite correction

    Posté par  . Évalué à 3.

    sleeper = subprocess.Popen([sys.argv[0], 'run'], stdin=subprocess.PIPE, shell=False)

    Il ne faut pas préfixer avec './' !

  • # Chaussette

    Posté par  . Évalué à 3.

    Après quelques recherches il me semble que les sockets puissent également être une solution à mon problème.

    J’arrive à ça, en essayant de faire exactement l’équivalent du premier script mais en utilisant socket :

    #!/usr/bin/env python3
    
    import sys
    import subprocess
    import socket
    import signal
    import os
    
    pidFilename = '/tmp/'+sys.argv[0]+'.pid'
    socketFilename = '/tmp/'+sys.argv[0]+'.socket'
    
    def graceful_exit(signum, frame):
    
        print('I terminate, my PID is '+str(os.getpid())+' and I received signal '+str(signum)+'', file=sys.stderr, flush=True)
        sys.exit(0)
    
    try:
    
        if sys.argv[1] == 'start':
    
            sleeper = subprocess.Popen([sys.argv[0], 'run'], stdin=subprocess.PIPE, shell=False)
            pidfile = open(pidFilename,'w')
            _pid = str(sleeper.pid)
            pidfile.write(_pid)
            pidfile.close()
    
        elif sys.argv[1] == 'run':
    
            signal.signal(signal.SIGTERM, graceful_exit)
            signal.signal(signal.SIGINT, graceful_exit)
    
            if os.path.exists(socketFilename):
    
                os.remove(socketFilename)
    
            server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
            server.bind(socketFilename)
    
            while True:
    
                _message = server.recv(65535).decode('UTF-8')
    
                if _message:
    
                    print('I’m alive, my PID is '+str(os.getpid())+' and I’ve been told to say : « '+_message+' »', file=sys.stderr, flush=True)
    
        elif sys.argv[1] == 'stop':        
    
            try:
    
                pidfile = open(pidFilename,'r')
                os.kill(int(pidfile.read()), signal.SIGTERM)
                pidfile.close()
                os.unlink(pidFilename)
    
            except (FileNotFoundError, ProcessLookupError):
    
                pass
    
        elif sys.argv[1] == 'say' and len(sys.argv) > 1:
    
            if os.path.exists(socketFilename):
    
                client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
                client.connect(socketFilename)
                client.send(bytes(sys.argv[2],'utf8'))
                client.close()
    
    except IndexError as e:
    
        print(str(e), file=sys.stderr)

    J’ai du mal à appréhender les pour et les contre de chaque méthode.

    En tous cas avec socket je n’ai pas besoin de faire ce truc crade avec sleep()… ça répond tout de suite, et ça n’interfère pas avec le stdin de mon terminal…

    • [^] # Re: Chaussette

      Posté par  . Évalué à 3.

      , vous en pensez quoi ?

      os.read(os.open(fifoFilename, os.O_NONBLOCK | os.O_RDONLY)

      C'est normal que tu épuises les fds disponibles car tu ne fermes pas ce qui est ouvet (en fait, tu attends que le GC le fasse, ce qui pourrait expliquer le pourquoi "avec un sleep ça marche").

      Typiquement, tu ouvres le fichier FIFO à l'extérieure de la boucle. Encore mieux, tu peux l'ouvrir avant de lancer ton subprocess (qui, in fine, n'est rien d'autre qu'un fork et donc hérité les FDs ouverts sauf si close_on_exec). Et puis, toujours mieux, au lieu d'utiliser un fichier fifo, tu peux utiliser un pipe anonyme. Mais… mais… Attendez ? Ne serait-ce pas ce que que stdin=subprocess.PIPE fait ? Mais oui :-) En fait, tu n'as qu'à faire des read/write sur ce stdin (en mode bloquant, de préférence), et tu l'as déjà ton IPC. :-)

      Utiliser des sockets est une autre option qui permet notamment de faire un select(), i.e. de multiplexer avec d'autres connections réseaux.

      • [^] # Re: Chaussette

        Posté par  . Évalué à 3.

        Merci pour ta réponse. Je peux continuer à ouvrir mon fifo dans la boucle si je le mets en bloquant et que je le ferme une fois que j’ai lu mes 65535 caractères :

        while True:
        
            _fifo = os.open(fifoFilename, os.O_RDONLY)
            _message = os.read(_fifo, 65535).decode('UTF-8')
        
            if _message:
        
                print('I’m alive, my PID is '+str(os.getpid())+' and I’ve been told to say : « '+_message+' »', file=sys.stderr, flush=True)
        
            os.close(_fifo)

        Là ça fonctionne comme je veux.

        Mais du coup je suis parti sur l’utilisation d’une socket…

        L’avantage d’une socket, si j’ai bien compris, c’est que je pourrais remplacer facilement, de manière transparente, ma socket UNIX par une socket INET, pour faire communiquer mes deux processus s’ils ne tournaient pas sur la même machine…

        • [^] # Re: Chaussette

          Posté par  . Évalué à 1. Dernière modification le 25 janvier 2017 à 08:41.

          note: tu peux utiliser la construction with open() as _fifo: qui te permet de scoper ton FD, pour le fermer automatiquement lorsque que le cours d'exécution sort du scope, et donc aussi en cas d'exception non rattrapée :)

  • # Pour compléter ces exemples

    Posté par  . Évalué à 3.

    En utilisant une socket de type AF_INET c’est un peu différent.

    #!/usr/bin/env python3
    
    import sys
    import subprocess
    import socket
    import signal
    import os
    
    pidFilename = '/tmp/'+sys.argv[0]+'.pid'
    
    def graceful_exit(signum, frame):
    
        print('I terminate, my PID is '+str(os.getpid())+' and I received signal '+str(signum)+'', file=sys.stderr, flush=True)
        sys.exit(0)
    
    try:
    
        if sys.argv[1] == 'start':
    
            sleeper = subprocess.Popen([sys.argv[0], 'run'], stdin=subprocess.PIPE, shell=False)
            pidfile = open(pidFilename,'w')
            _pid = str(sleeper.pid)
            pidfile.write(_pid)
            pidfile.close()
    
        elif sys.argv[1] == 'run':
    
            signal.signal(signal.SIGTERM, graceful_exit)
            signal.signal(signal.SIGINT, graceful_exit)
    
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 1234))
            server.listen(3)
    
    
            while True:
    
    
                client, addr = server.accept()
                message = client.recv(65535).decode('UTF-8')
    
                if message:
    
                    print('I’m alive, my PID is '+str(os.getpid())+' and I’ve been told to say : « '+message+' »', file=sys.stderr, flush=True)
    
    
        elif sys.argv[1] == 'stop':        
    
            try:
    
                pidfile = open(pidFilename,'r')
                os.kill(int(pidfile.read()), signal.SIGTERM)
                pidfile.close()
                os.unlink(pidFilename)
    
            except (FileNotFoundError, ProcessLookupError):
    
                pass
    
        elif sys.argv[1] == 'say' and len(sys.argv) > 1:
    
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.connect(('127.0.0.1', 1234))
            client.send(bytes(sys.argv[2],'utf8'))
    
    
    except IndexError as e:
    
        print(str(e), file=sys.stderr)
    • [^] # Pour compléter ces exemples, la version avec socket non-bloquantes

      Posté par  . Évalué à 3.

      Bon alors pour mon exemple ça ne sert à rien d’avoir des sockets non-bloquantes puisse qu’il y en a qu’une mais dans le cas où on devrait en manipuler plusieurs on peut faire ainsi :

      #!/usr/bin/env python3
      
      import sys
      import subprocess
      import socket
      import signal
      import select
      import os
      
      pidFilename = '/tmp/'+sys.argv[0]+'.pid'
      
      def graceful_exit(signum, frame):
      
          print('I terminate, my PID is '+str(os.getpid())+' and I received signal '+str(signum)+'', file=sys.stderr, flush=True)
          sys.exit(0)
      
      try:
      
          if sys.argv[1] == 'start':
      
              sleeper = subprocess.Popen([sys.argv[0], 'run'], stdin=subprocess.PIPE, shell=False)
              pidfile = open(pidFilename,'w')
              _pid = str(sleeper.pid)
              pidfile.write(_pid)
              pidfile.close()
      
          elif sys.argv[1] == 'run':
      
              signal.signal(signal.SIGTERM, graceful_exit)
              signal.signal(signal.SIGINT, graceful_exit)
      
              server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
              server.bind(('127.0.0.1', 1234))
              server.setblocking(0)
              server.listen(3)
      
      
              while True:
      
                  _sockets_ready = select.select([server],[],[])
                  client, addr = _sockets_ready[0][0].accept()
      
                  message = client.recv(65535).decode('UTF-8')
      
                  if message:
      
                      print('I’m alive, my PID is '+str(os.getpid())+' and I’ve been told to say : « '+message+' »', file=sys.stderr, flush=True)
      
      
          elif sys.argv[1] == 'stop':        
      
              try:
      
                  pidfile = open(pidFilename,'r')
                  os.kill(int(pidfile.read()), signal.SIGTERM)
                  pidfile.close()
                  os.unlink(pidFilename)
      
              except (FileNotFoundError, ProcessLookupError):
      
                  pass
      
          elif sys.argv[1] == 'say' and len(sys.argv) > 1:
      
              try:
      
                  client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                  client.connect(('127.0.0.1', 1234))
                  client.send(bytes(sys.argv[2],'utf8'))
      
              except ConnectionRefusedError as e:
      
                  print(str(e), file=sys.stderr)
      
      except IndexError as e:
      
          print(str(e), file=sys.stderr)

      C’est le select qui prend en objet les trois listes de sockets (potential_readers, potential_writers, potential_errs) et retourne ces trois listes avec seulement les sockets qui sont prêtes :

      _sockets_ready = select.select([server],[],[])
      client, addr = _sockets_ready[0][0].accept()
      message = client.recv(65535).decode('UTF-8')

      Ici le accept() n’aura pas lieu tant qu’il n’y a rien à lire sur la socket 'server', « bloquant » ainsi la boucle while, qui n’a effectivement aucune raison de tourner s’il n’y a rien à lire…

      https://docs.python.org/3/howto/sockets.html#non-blocking-sockets

      J’espère ne pas avoir raconté trop de connerie et qu’on me corrigera le cas échéant.

Suivre le flux des commentaires

Note : les commentaires appartiennent à celles et ceux qui les ont postés. Nous n’en sommes pas responsables.