Module gonioimsoft.vio_server

Voltage input/output server.

Functions

def main()
def run_plotter(queue, title)

Classes

class DummyBoard

For testing the server without an NI board.

For documentation, see the NIBoard class.

Expand source code
class DummyBoard:
    '''For testing the server without an NI board.

    For documentation, see the NIBoard class.
    '''
    def analog_input(self, duration, wait_trigger=False):
        print('DummyBoard.analog_input(...)')
        print(f'dur={duration} | wait_trigger={wait_trigger}')

    def set_settings(self, device, channels, fs):
        print('DummyBoard.set_settings(...)')
        print(f'dvs={device} | chs={channels} | fs={fs}')

Methods

def analog_input(self, duration, wait_trigger=False)
def set_settings(self, device, channels, fs)
class NIBoard
Expand source code
class NIBoard:
    def __init__(self):

        self.device = 'Dev2'
        self.channels = ['ai0']
        self.fs = 1000

        self.live_queue = None
        self.title = 'Analog input'
    

    def set_settings(self, device, channels, fs):
        '''

        Arguments
        ---------
        dev : string
            The NI board name (usually "Dev1" or "Dev2").
        channels : string
            Channels names to record, separated by commas.
        fs : float or int
            The used sampling frequency in Hz (samples/second).
        '''
        self.device = device
        self.channels = channels.split(',')
        self.fs = float(fs)

        self.title = f'Analog input - {self.device} {self.channels} {self.fs} Hz'
        
   
    def analog_input(self, duration, save=None, wait_trigger=False):
        '''Records voltage input and saves it.

           
        duration : int or float
            In seconds, the recording's length.

        '''
        if save == 'None':
            save = None
            
        duration = float(duration)
        timeout = duration + 10

        N_channels = len(self.channels)
        N_samples = int(duration * self.fs)
        

        if str(wait_trigger).lower() == 'true':
            wait_trigger = True
        else:
            wait_trigger = False


        with nidaqmx.Task() as task:
            
            for channel in self.channels:
                task.ai_channels.add_ai_voltage_chan(f'{self.device}/{channel}')

            task.timing.cfg_samp_clk_timing(
                    self.fs, samps_per_chan=N_samples)
            task.ai_conv_rate = self.fs
            if wait_trigger:
                task.triggers.start_trigger.cfg_dig_edge_start_trig(
                        f'/{self.device}/PFI0')

            task.start()

            data = task.read(
                timeout=timeout, number_of_samples_per_channel=N_samples)


        if save is not None:
            fn = os.path.join(
                self.save_directory, f'{save}.npy')
            np.save(fn, np.array(data))


        if self.live_queue is None:
            self.live_queue = multiprocessing.Queue()
            self.live_queue.put(data)
            
            self.livep = multiprocessing.Process(
                    target=run_plotter,
                    args=(self.live_queue,self.title))
            self.livep.start()
            
        else:
            self.live_queue.put(data)

Methods

def analog_input(self, duration, save=None, wait_trigger=False)

Records voltage input and saves it.

duration : int or float In seconds, the recording's length.

def set_settings(self, device, channels, fs)

Arguments

dev : string
The NI board name (usually "Dev1" or "Dev2").
channels : string
Channels names to record, separated by commas.
fs : float or int
The used sampling frequency in Hz (samples/second).
class Plotter
Expand source code
class Plotter:
         
    def loop(self, queue, title):
        
        fig = plt.figure()
        ax = fig.add_subplot(111)
        plt.show(block=False)

        fig.canvas.toolbar.winfo_toplevel().title(title)
        
        while True:
            if queue.empty():
                plt.pause(0.1)
                continue
            while not queue.empty():
                data = queue.get()
            if isinstance(data, str) and data == 'close':
                break

            ax.clear()
            ax.plot(data)
            plt.pause(0.1)

Methods

def loop(self, queue, title)
class VIOServer (device, port=None)

Analog voltage input/output board server.

Expand source code
class VIOServer(ServerBase):
    '''Analog voltage input/output board server.
    '''

    def __init__(self, device, port=None):

        if port is None:
            port = VIO_PORT
        super().__init__('', port, device)
        
        self.functions['analog_input'] = self.device.analog_input
        self.functions['set_settings'] = self.device.set_settings

Ancestors

Inherited members