Manipulation de bytes

TR_2000 Messages postés 23 Date d'inscription   Statut Membre Dernière intervention   -  
yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention   -
Bonjour,

Je cherche à créer un graphique et de l'audio à partir de bytes de données. Malheureusement je ne comprends pas comment extraire l'information à partir de ces données reçues.

Je travaille avec GnuRadio, le processus est simple : j'échantillonne un signal reçu par une radio à 2MHz, puis j'envoie ces données sur un port en UDP. Pour le moment je n'effectue aucune modification dessus. Je peux paramétrer la taille de la "payload" en bytes.

De l'autre coté, j'ai un serveur en python qui écoute le port et c'est ici que j'aimerai traiter les données.
Si je ne touche à rien, avec la fonction sockets.recvfrom() j'obtient ceci :

b'`\xbb\x00\x00\xc0\xbb\x00\x00\x80\xbb'

Ce résultat est pour une payload de 10bytes, je remarque alors que la taille détermine à peu près le nombre de valeur \xXX

J'utilise la formule "int.from_bytes() pour convertir ces données en entier (simplement déjà pour comprendre à quoi elles correspondent) le problème est que j'obtient :

52354345668182016 par exemple pour les données juste au dessus.

Je comprends alors que toutes les données sont réunies en un seul nombre ici et je ne suis pas sûr de vouloir cela.

Auriez-vous une idée de comment traiter ces données ? Je ne suis pas habitué à manipuler des bytes, la solution est sûrement toute simple. Peut-être simplement une bouble for pour séparer chaque données \x ?

Merci de votre aide,

4 réponses

  1. yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention   Ambassadeur 1 588
     
    bonjour,
    tu n'expliques pas ce que tu veux faire avec chaque octet.
    peut-être:
    bs=b'`\xbb\x00\x00\xc0\xbb\x00\x00\x80\xbb'
    print(type(bs),bs)
    for b in list(bs):
        print (type(b),b)
    0
  2. yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention   Ambassadeur 1 588
     
    Si tu veux reconstituer les données comme elles étaient avant d'être envoyées, tu devrais montrer comment elles sont envoyées.
    0
    1. TR_2000 Messages postés 23 Date d'inscription   Statut Membre Dernière intervention   1
       


      Merci pour votre réponse, je vais essayer...!

      Comme vous le voyez sur la photo, je ne sais justement pas comment sont les données avant d'être envoyées. C'est le logiciel qui acquière des informations à partir de la radio (représenté par le bloc Osmocom Source) puis je transmets directement cela vers le bloc UDP.
      On voit d'ailleurs que sur l'image la payload est maintenant de 1024 bytes.

      Mon objectif est à terme de créer un graphique type waterfall pour permettre à un utilisateur d'observer quelles sont les fréquences où il y a de l'activité par exemple.

      Pour cela, je dois d'abord comprendre qu'est ce qui "se cache" derrière les données que je reçois en UDP. C'est la où je coince...
      0
      1. yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention   1 588 > TR_2000 Messages postés 23 Date d'inscription   Statut Membre Dernière intervention  
         
        Donc cela n'a rien à voir avec python: tu veux déterminer comment les données sont envoyées.

        Quand j'utilise int.from_bytes() sur tes données, je n'obtiens pas le même résultat que toi. Peux-tu montrer ton code?

        Tu peux peut-être essayer en traitant 4 bytes à la fois.
        0
      2. TR_2000 Messages postés 23 Date d'inscription   Statut Membre Dernière intervention   1 > yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention  
         
        Très bien, mon code est le suivant :

        #Ajout des librairies :
        
        import socket 			#pour les sockets	
        import websockets 		#pour les websockets  
        import asyncio    		#pour les fonctions asynchrones (qui s'execute dans n'importe quel ordre)		
        import time				#pour le sleep()	
        from threading import Thread
        import numpy as np
        from scipy import fromfile
        import matplotlib.pyplot as plt 
        
        msg_gnuradio = "pas de données pour l'instant"
        int_msg_gnuradio = 0
        #Fonction liée au thread UDP 
        def listen_udp():
            global msg_gnuradio
            global int_msg_gnuradio
            payload_size = 4              #Nombre de bytes des données GnuRadio 
            #Création socket UDP
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        
            #Lier la socket à l'adress IP et le port
            s.bind(('localhost', 1234))
            print("Serveur UDP à l'écoute")
            while(True):
        
                addr = s.recvfrom(payload_size)
                # msg_gnuradio = format(addr)
                msg_gnuradio = addr[0] 
                # print(msg_gnuradio)
                int_msg_gnuradio = int.from_bytes(msg_gnuradio, byteorder='big', signed=True)
                # print(int_msg_gnuradio)
                # time.sleep(1)
        
        
        #Fonction liée au thread WEB 
        async def listen_ws(websocket, path):
            try:
                msg_client = await websocket.recv()
                print("< " + msg_client)
                i = 0
                global msg_gnuradio
                while True:
                    try:
                        await websocket.send(format(i)+" : "+msg_gnuradio)
                        time.sleep(1)
                        i = i+1
                    except KeyboardInterrupt:
                        print("Exiting program...")
            finally:
                await websocket.send("Le serveur n'est pas connecté à GNURadio")
        	
        
        def start_loop(loop, server):
            loop.run_until_complete(server)
            loop.run_forever()
        
        
        #Lancement Thread UDP 
        udp = Thread(target=listen_udp)			#Création thread pour l'UDP	
        udp.start()
        
        # start a new event loop
        new_loop = asyncio.new_event_loop()
        
        #Lancement Thread WEB 
        port_websockets = 8000		#port pour les connexions websocket	
        nb_clients = 0				#nombre de clients connecté	
        start_server = websockets.serve(listen_ws, "localhost", port_websockets, loop=new_loop)
        web = Thread(target=start_loop, args=(new_loop, start_server))
        web.start()
        print("Serveur websocket à l'écoute sur le port " + str(port_websockets))
        
        while True:
            print(msg_gnuradio)
            print(int_msg_gnuradio)
            print(type(msg_gnuradio),msg_gnuradio)
            for b in list(msg_gnuradio):
                print(type(b),b)
            time.sleep(1)
        
        
        
        #Traitement des données  
        # T = 1./(32e3)
        # datas = int(msg_gnuradio)
        # n = len(datas)
        # t = np.linspace(0, T*n, num = n) 
        # plt.plot(t, datas)
        # plt.show()
        # time.sleep(1)
        # plt.close()
        


        C'est un serveur qui a pour but d'envoyer des données vers un client Websockets, la partie UDP se trouve au début.

        J'ai testé pour 4 bytes et j'obtient ceci pour un tour de boucle :

        b'\x00\x00^\xbd'
        24253
        <class 'bytes'> b'\x00\x00^\xbd'
        <class 'int'> 0
        <class 'int'> 0
        <class 'int'> 94
        <class 'int'> 189
        0
  3. yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention   Ambassadeur 1 588
     
    Si j'ai bien compris, GnuRadio peut générer du code Python.

    Donc, si tu crées un nouveau processus GnuRadio qui contient un bloc "UDP Source", puis que tu génères le code Python correspondant à ce processus, tu pourras examiner ce code Python pour déterminer comment il analyse les données UDP.
    0
    1. TR_2000 Messages postés 23 Date d'inscription   Statut Membre Dernière intervention   1
       
      Oui, je suis d'accord avec vous, le problème est qu'avec mes connaissances en tous cas je ne trouve pas l'information..

      Voici le code généré pour une architecture contenant que le bloc Osmocom de ma radio et le bloc UDP pour garder uniquement ce qui m'intéresse :

      #!/usr/bin/env python2
      # -*- coding: utf-8 -*-
      ##################################################
      # GNU Radio Python Flow Graph
      # Title: Top Block
      # Generated: Fri Jun  3 16:56:14 2022
      ##################################################
      
      from distutils.version import StrictVersion
      
      if __name__ == '__main__':
          import ctypes
          import sys
          if sys.platform.startswith('linux'):
              try:
                  x11 = ctypes.cdll.LoadLibrary('libX11.so')
                  x11.XInitThreads()
              except:
                  print "Warning: failed to XInitThreads()"
      
      from PyQt5 import Qt, QtCore
      from gnuradio import blocks
      from gnuradio import eng_notation
      from gnuradio import gr
      from gnuradio.eng_option import eng_option
      from gnuradio.filter import firdes
      from gnuradio.qtgui import Range, RangeWidget
      from optparse import OptionParser
      import osmosdr
      import sys
      import time
      from gnuradio import qtgui
      
      
      class top_block(gr.top_block, Qt.QWidget):
      
          def __init__(self):
              gr.top_block.__init__(self, "Top Block")
              Qt.QWidget.__init__(self)
              self.setWindowTitle("Top Block")
              qtgui.util.check_set_qss()
              try:
                  self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc'))
              except:
                  pass
              self.top_scroll_layout = Qt.QVBoxLayout()
              self.setLayout(self.top_scroll_layout)
              self.top_scroll = Qt.QScrollArea()
              self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame)
              self.top_scroll_layout.addWidget(self.top_scroll)
              self.top_scroll.setWidgetResizable(True)
              self.top_widget = Qt.QWidget()
              self.top_scroll.setWidget(self.top_widget)
              self.top_layout = Qt.QVBoxLayout(self.top_widget)
              self.top_grid_layout = Qt.QGridLayout()
              self.top_layout.addLayout(self.top_grid_layout)
      
              self.settings = Qt.QSettings("GNU Radio", "top_block")
              self.restoreGeometry(self.settings.value("geometry", type=QtCore.QByteArray))
      
      
              ##################################################
              # Variables
              ##################################################
              self.sample_rate = sample_rate = 2e6
              self.frequency = frequency = 100e6
      
              ##################################################
              # Blocks
              ##################################################
              self._frequency_range = Range(1e6, 2e9, 100e3, 100e6, 200)
              self._frequency_win = RangeWidget(self._frequency_range, self.set_frequency, "frequency", "counter_slider", float)
              self.top_grid_layout.addWidget(self._frequency_win)
              self.xtrx0 = osmosdr.source( args="numchan=" + str(1) + " " + '' )
              self.xtrx0.set_sample_rate(sample_rate)
              self.xtrx0.set_center_freq(frequency, 0)
              self.xtrx0.set_freq_corr(0, 0)
              self.xtrx0.set_dc_offset_mode(2, 0)
              self.xtrx0.set_iq_balance_mode(0, 0)
              self.xtrx0.set_gain_mode(True, 0)
              self.xtrx0.set_gain(10, 0)
              self.xtrx0.set_if_gain(20, 0)
              self.xtrx0.set_bb_gain(20, 0)
              self.xtrx0.set_antenna('', 0)
              self.xtrx0.set_bandwidth(0, 0)
      
              self.blocks_udp_sink_0 = blocks.udp_sink(gr.sizeof_gr_complex*1, '127.0.0.1', 1234, 4, True)
      
      
      
              ##################################################
              # Connections
              ##################################################
              self.connect((self.xtrx0, 0), (self.blocks_udp_sink_0, 0))
      
          def closeEvent(self, event):
              self.settings = Qt.QSettings("GNU Radio", "top_block")
              self.settings.setValue("geometry", self.saveGeometry())
              event.accept()
      
          def get_sample_rate(self):
              return self.sample_rate
      
          def set_sample_rate(self, sample_rate):
              self.sample_rate = sample_rate
              self.xtrx0.set_sample_rate(self.sample_rate)
      
          def get_frequency(self):
              return self.frequency
      
          def set_frequency(self, frequency):
              self.frequency = frequency
              self.xtrx0.set_center_freq(self.frequency, 0)
      
      
      def main(top_block_cls=top_block, options=None):
      
          qapp = Qt.QApplication(sys.argv)
      
          tb = top_block_cls()
          tb.start()
          tb.show()
      
          def quitting():
              tb.stop()
              tb.wait()
          qapp.aboutToQuit.connect(quitting)
          qapp.exec_()
      
      
      if __name__ == '__main__':
          main()
      
      


      De ce que je comprends, les lignes 70 à 85 concernent les paramètres que j'ai donné au bloc Osmocom Source
      Puis vient le bloc UDP sink à la ligne 87 qui n'est pas + détaillé que ça...
      0
  4. yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention   Ambassadeur 1 588
     
    Je me demande si tu ne pourrais pas extraire les données ainsi:
    import numpy
    bs=b'`\xbb\x00\x00\xc0\xbb\x00\x00'
    x=numpy.frombuffer(bs,dtype=numpy.complex64)
    for y in x:
        print(type(y),y)
    0
    1. TR_2000 Messages postés 23 Date d'inscription   Statut Membre Dernière intervention   1
       
      Bonjour,

      Non, cette solution ne fonctionne pas...

      Merci tout de même
      0
      1. yg_be Messages postés 23437 Date d'inscription   Statut Contributeur Dernière intervention   1 588 > TR_2000 Messages postés 23 Date d'inscription   Statut Membre Dernière intervention  
         
        "ne fonctionne pas"?
        0