Module gonioimsoft.vio_server
Voltage input/output server.
Functions
def main()
def run_plotter(queue, title)
Classes
class DummyBoard
-
For testing the server without an NI board.
For documentation, see the NIBoard class.
Expand source code
class DummyBoard: '''For testing the server without an NI board. For documentation, see the NIBoard class. ''' def analog_input(self, duration, wait_trigger=False): print('DummyBoard.analog_input(...)') print(f'dur={duration} | wait_trigger={wait_trigger}') def set_settings(self, device, channels, fs): print('DummyBoard.set_settings(...)') print(f'dvs={device} | chs={channels} | fs={fs}')
Methods
def analog_input(self, duration, wait_trigger=False)
def set_settings(self, device, channels, fs)
class NIBoard
-
Expand source code
class NIBoard: def __init__(self): self.device = 'Dev2' self.channels = ['ai0'] self.fs = 1000 self.live_queue = None self.title = 'Analog input' def set_settings(self, device, channels, fs): ''' Arguments --------- dev : string The NI board name (usually "Dev1" or "Dev2"). channels : string Channels names to record, separated by commas. fs : float or int The used sampling frequency in Hz (samples/second). ''' self.device = device self.channels = channels.split(',') self.fs = float(fs) self.title = f'Analog input - {self.device} {self.channels} {self.fs} Hz' def analog_input(self, duration, save=None, wait_trigger=False): '''Records voltage input and saves it. duration : int or float In seconds, the recording's length. ''' if save == 'None': save = None duration = float(duration) timeout = duration + 10 N_channels = len(self.channels) N_samples = int(duration * self.fs) if str(wait_trigger).lower() == 'true': wait_trigger = True else: wait_trigger = False with nidaqmx.Task() as task: for channel in self.channels: task.ai_channels.add_ai_voltage_chan(f'{self.device}/{channel}') task.timing.cfg_samp_clk_timing( self.fs, samps_per_chan=N_samples) task.ai_conv_rate = self.fs if wait_trigger: task.triggers.start_trigger.cfg_dig_edge_start_trig( f'/{self.device}/PFI0') task.start() data = task.read( timeout=timeout, number_of_samples_per_channel=N_samples) if save is not None: fn = os.path.join( self.save_directory, f'{save}.npy') np.save(fn, np.array(data)) if self.live_queue is None: self.live_queue = multiprocessing.Queue() self.live_queue.put(data) self.livep = multiprocessing.Process( target=run_plotter, args=(self.live_queue,self.title)) self.livep.start() else: self.live_queue.put(data)
Methods
def analog_input(self, duration, save=None, wait_trigger=False)
-
Records voltage input and saves it.
duration : int or float In seconds, the recording's length.
def set_settings(self, device, channels, fs)
-
Arguments
dev
:string
- The NI board name (usually "Dev1" or "Dev2").
channels
:string
- Channels names to record, separated by commas.
fs
:float
orint
- The used sampling frequency in Hz (samples/second).
class Plotter
-
Expand source code
class Plotter: def loop(self, queue, title): fig = plt.figure() ax = fig.add_subplot(111) plt.show(block=False) fig.canvas.toolbar.winfo_toplevel().title(title) while True: if queue.empty(): plt.pause(0.1) continue while not queue.empty(): data = queue.get() if isinstance(data, str) and data == 'close': break ax.clear() ax.plot(data) plt.pause(0.1)
Methods
def loop(self, queue, title)
class VIOServer (device, port=None)
-
Analog voltage input/output board server.
Expand source code
class VIOServer(ServerBase): '''Analog voltage input/output board server. ''' def __init__(self, device, port=None): if port is None: port = VIO_PORT super().__init__('', port, device) self.functions['analog_input'] = self.device.analog_input self.functions['set_settings'] = self.device.set_settings
Ancestors
Inherited members