Module gonioimsoft.camera_server
The camera server for using the camera and save images.
Functionality
- Image acquisition using Micro-Manager's Python bindings (pymmcore)
- Live feed using matplotlib
- File saving using the tifffile module
Background
On Windows, MM builds came precompiled with Python 2 support only. The camera server/client division here made it possible to run the server calling MM using Python 2 (to control the camera and image saving) and then the camera client run with Python 3.
It is still usefull; Because it uses (network) sockets, it is trivial to use another (or multiple PC) to acqurie images. Or, in future, to use another backends than MM.
Functions
def main()
def test_camera()
Classes
class CameraBase
-
Base class for all cameras
Expand source code
class CameraBase: '''Base class for all cameras ''' def __init__(self): self.settings = { 'exposure_time_scaler': 1, 'roi': None, 'binning': None} self.live_queue= False self.shower = ImageShower() # Description file string self.description_string = '' self.save_stack = False self.save_directory = None self._startdir = os.getcwd() self.title = 'Camera not set' self.servertitle = '' def get_cameras(self): raise NotImplementedError def get_camera(self): raise NotImplementedError def set_camera(self): raise NotImplementedError def get_settings(self): excl = ['roi', 'binning'] return [s for s in list(self.settings.keys()) if s not in excl] def get_setting_type(self, setting_name): raise NotImplementedError def get_setting(self, setting_name): if setting_name in self.settings: return self.settings[setting_name] return '' def set_setting(self, setting_name, value): # a) Internal setting if setting_name in self.settings: self.settings[setting_name] = value return True return False def acquire_single(self, exposure_time, save, subdir): raise NotImplementedError def _post_acquire_single(self, image, metadata, save): if not self.live_queue: self.live_queue = multiprocessing.Queue() self.live_queue.put( image[0::LIVE_DOWNSAMPLE, 0::LIVE_DOWNSAMPLE]) self.livep = multiprocessing.Process( target=self.shower.loop, args=(self.live_queue,self.title)) self.livep.start() self.live_queue.put(image[0::LIVE_DOWNSAMPLE, 0::LIVE_DOWNSAMPLE]) if save == 'True': save_thread = threading.Thread(target=self.save_images,args=([image],'snap_{}'.format(start_time.replace(':','.').replace(' ','_')), metadata,os.path.join(self.save_directory, subdir))) save_thread.start() def _post_acquire_series(self, images, metadata, label): save_thread = threading.Thread(target=self.save_images, args=(images,label,metadata,os.path.join(self.save_directory, subdir))) save_thread.start() def save_images(self, images, label, metadata, savedir): ''' Save given images as grayscale tiff images. ''' savedir = os.path.join(self._startdir, savedir) if not os.path.isdir(savedir): try: os.makedirs(savedir) except: # May fail if many local servers creating # the folders simultaneously pass if self.save_stack == False: # Save separate images for i, image in enumerate(images): fn = '{}_{}.tiff'.format(label, i) tifffile.imwrite(os.path.join(savedir, fn), image, metadata=metadata) else: # Save a stack fn = '{}_stack.tiff'.format(label) tifffile.imwrite(os.path.join(savedir, fn), np.asarray(images), metadata=metadata) self.save_description(os.path.join(savedir, 'description'), self.description_string, internal=True) def set_save_stack(self, boolean): ''' If boolean == "True", save images as stacks instead of separate images. ''' if boolean == 'True': self.save_stack = True elif boolean == 'False': self.save_stack = False else: print("Did not understand wheter to save stacks. Given {}".format(boolean)) def set_binning(self, binning): ''' Binning '2x2' for example. ''' self.settings['binning'] = binning def set_roi(self, x,y,w,h): ''' In binned pixels roi (x,y,w,h) or None ''' x = int(x) y = int(y) w = int(w) h = int(h) if w == 0 or h==0: self.settings['roi'] = None else: self.settings['roi'] = [x,y,w,h] def save_description(self, specimen_name, desc_string, internal=False): ''' Allows saving a small descriptive text file into the main saving directory. Filename should be the same as the folder where it's saved. Appends to the previous file. specimen_name DrosoM42 for example, name of the specimen folder desc_string String, what to write in the file internal If true, specimen_name becomes filename of the file ''' if internal: fn = specimen_name else: fn = os.path.join(self.save_directory, specimen_name, specimen_name) # Check if the folder exists if not os.path.exists(os.path.dirname(fn)): try: os.makedirs(os.path.dirname(fn), exist_ok=True) except: # My fail if many local servers. Let's not # care about it, another server has made it pass try: with open(fn+'.txt', 'w') as fp: fp.write(desc_string) print("Wrote file " + fn+'.txt') except: pass self.description_string = desc_string def close(self): if self.live_queue: self.live_queue.put('close') def wait_for_client(self): pass
Subclasses
Methods
def acquire_single(self, exposure_time, save, subdir)
def close(self)
def get_camera(self)
def get_cameras(self)
def get_setting(self, setting_name)
def get_setting_type(self, setting_name)
def get_settings(self)
def save_description(self, specimen_name, desc_string, internal=False)
-
Allows saving a small descriptive text file into the main saving directory. Filename should be the same as the folder where it's saved.
Appends to the previous file.
specimen_name DrosoM42 for example, name of the specimen folder desc_string String, what to write in the file internal If true, specimen_name becomes filename of the file
def save_images(self, images, label, metadata, savedir)
-
Save given images as grayscale tiff images.
def set_binning(self, binning)
-
Binning '2x2' for example.
def set_camera(self)
def set_roi(self, x, y, w, h)
-
In binned pixels roi (x,y,w,h) or None
def set_save_stack(self, boolean)
-
If boolean == "True", save images as stacks instead of separate images.
def set_setting(self, setting_name, value)
def wait_for_client(self)
class CameraServer (camera, port=None)
-
Camera server listens incoming connections from the client and controls a camera class.
Expand source code
class CameraServer(ServerBase): '''Camera server listens incoming connections from the client and controls a camera class. ''' def __init__(self, camera, port=None): if port is None: port = CAMERA_PORT super().__init__('', port, camera) print(f'Using the camera <{camera.__class__.__name__}>') self.cam = self.device self.cam.servertitle = f'Server on port {port}' self.cam.wait_for_client = self.wait_for_client added_functions = {'acquireSeries': self.cam.acquire_series, 'acquireSingle': self.cam.acquire_single, 'saveDescription': self.cam.save_description, 'set_roi': self.cam.set_roi, 'set_save_stack': self.cam.set_save_stack, 'get_cameras': self.cam.get_cameras, 'get_camera': self.cam.get_camera, 'set_camera': self.cam.set_camera, 'get_settings': self.cam.get_settings, 'get_setting_type': self.cam.get_setting_type, 'get_setting': self.cam.get_setting, 'set_setting': self.cam.set_setting, } self.functions = {**self.functions, **added_functions} self.responders.extend( ['get_cameras', 'get_camera', 'get_settings', 'get_setting_type', 'get_setting'] )
Ancestors
Inherited members
class DummyCamera
-
A dummy camera suitable for testing the server/client.
Expand source code
class DummyCamera(CameraBase): '''A dummy camera suitable for testing the server/client. ''' def __init__(self): super().__init__() self.settings = {'setting1' : 'na', 'setting2': 0.0, 'setting3': 1} self.camera = None def acquire_single(self, exposure_time, save, subdir): start_time = str(datetime.datetime.now()) image = np.random.randint(0,2**10-1, (512,512)) metadata = {'exposure_time_s': exposure_time, 'function': 'acquireSingle-dummy', 'start_time': start_time} self._post_acquire_single(image, metadata, save) def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir): start_time = str(datetime.datetime.now()) images = [np.random.randint(0,2**10-1, (512,512)) for i in range(10)] metadata = {'exposure_time_s': exposure_time, 'image_interval_s': image_interval, 'N_frames': N_frames, 'label': label, 'function': 'acquireSeries-dummy', 'start_time': start_time} metadata.update(self.settings) self._post_acquire_series(images, metadata, label) def get_cameras(self): return ['dummy1', 'dummy2'] def get_camera(self): return self.camera def set_camera(self, name): self.camera = name def get_setting_type(self, setting_name): base = super().get_setting_type() if base: return base elif setting_name == 'setting1': return 'string' elif setting_name == 'setting2': return 'float' elif setting_name == 'setting3': return 'integer' else: print('Invalid setting') return ''
Ancestors
Methods
def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir)
def acquire_single(self, exposure_time, save, subdir)
def get_camera(self)
def get_cameras(self)
def get_setting_type(self, setting_name)
def set_camera(self, name)
Inherited members
class ImageShower
-
Shows images on the screen in its own window.
In future, may be used to select ROIs as well to allow higher frame rate imaging / less data.
Working Principle
Image shower works so that self.loop is started as a separate process using multiprocessing library
Methods
self.loop Set this as multiprocessing target
Expand source code
class ImageShower: '''Shows images on the screen in its own window. In future, may be used to select ROIs as well to allow higher frame rate imaging / less data. ------------------ Working principle ------------------ Image shower works so that self.loop is started as a separate process using multiprocessing library ------- Methods ------- self.loop Set this as multiprocessing target ''' def __init__(self): self.fig = plt.figure() self.ax = self.fig.add_subplot(111) self.close = False #self.cid = self.fig.canvas.mpl_connect('key_press_event', self.callbackButtonPressed) self.image_brightness = 0 self.image_maxval = 1 self.selection = None self.image_size = None def callbackButtonPressed(self, event): if event.key == 'r': self.image_maxval -= 0.05 self._updateImage(strong=True) elif event.key == 't': self.image_maxval += 0.05 self._updateImage(strong=True) def __onSelectRectangle(self, eclick, erelease): # Get selection box coordinates and set the box inactive x1, y1 = eclick.xdata, eclick.ydata x2, y2 = erelease.xdata, erelease.ydata #self.rectangle.set_active(False) x = int(min((x1, x2))) y = int(min((y1, y2))) width = int(abs(x2-x1)) height = int(abs(y2-y1)) self.selection = [x, y, width, height] def _updateImage(self, i): data = None while not self.queue.empty(): # Get the latest image in the queue data = self.queue.get(True, timeout=0.01) if data is None: return self.im, '' elif isinstance(data, str) and data == 'close': self.close = True return self.im, '' if self.selection and data.size != self.image_size: self.selection = None if self.selection: x,y,w,h = self.selection if w<1 or h<1: # If selection box empty (accidental click on the image) # use the whole image instead inspect_area = data else: inspect_area = data[y:y+h, x:x+w] else: inspect_area = data per95 = np.percentile(inspect_area, 95) per5 = np.percentile(inspect_area, 5) data = np.clip(data, per5, per95) data -= per5 data /= (per95-per5) self.image_size = data.size self.im.set_array(data) self.fig.suptitle('Selection 95th percentile: {}'.format(per95), fontsize=10) text = '' return self.im, text def loop(self, queue, title): ''' Runs the ImageShower by reading images from the given queue. Set this as a multiprocessing target. queue Multiprocessing queue with a get method. ''' self.queue = queue self.rectangle = RectangleSelector(self.ax, self.__onSelectRectangle, useblit=True) image = queue.get() self.im = plt.imshow(1000*image/np.max(image), cmap='gray', vmin=0, vmax=1, interpolation='none', aspect='auto') self.ani = FuncAnimation(plt.gcf(), self._updateImage, frames=range(100), interval=50, blit=False) self.fig.canvas.toolbar.winfo_toplevel().title(title) # Remove the toolbar; Gives more space when having many cameras self.fig.canvas.toolbar.pack_forget() # Take away all white space plt.subplots_adjust(top=1, bottom=0, right=1,left=0, hspace=0, wspace=0) plt.show(block=True)
Methods
def callbackButtonPressed(self, event)
def loop(self, queue, title)
-
Runs the ImageShower by reading images from the given queue. Set this as a multiprocessing target.
queue Multiprocessing queue with a get method.
class MMCamera
-
Controls any camera using MicroManager and its pymmcore bindings.
Expand source code
class MMCamera(CameraBase): '''Controls any camera using MicroManager and its pymmcore bindings. ''' def __init__(self): super().__init__() self.mmc = pymmcore.CMMCore() self.mmc.setDeviceAdapterSearchPaths([DEFAULT_MICROMANAGER_DIR]) self._device_name = None self._configuration_name = '' #self.mmc.loadDevice('Camera', 'HamamatsuHam', 'HamamatsuHam_DCAM') #self.mmc.initializeAllDevices() #self.mmc.setCameraDevice('Camera') #self.mmc.setCircularBufferMemoryFootprint(4000) def get_cameras(self): '''Lists available MicroManager configuration files in . ''' return [fn for fn in os.listdir(DEFAULT_MICROMANAGER_DIR) if fn.endswith('.cfg')] def get_camera(self): '''Returns the label of the current camera. ''' return self._configuration_name def set_camera(self, name): '''Set the provided configuration file. ''' if not os.path.exists(name): path = os.path.join(DEFAULT_MICROMANAGER_DIR, name) if not os.path.exists(path): print(f'Couldnt open configuration file named: {path}') return self.mmc.loadSystemConfiguration(path) self._device_name = self.mmc.getCameraDevice() self._configuration_name = name self.mmc.prepareSequenceAcquisition(self._device_name) self.title = f'{name} ({self._device_name}) | {self.servertitle}' def get_settings(self): '''Returns device property names ''' own_settings = super().get_settings() if self._device_name is None: return '' properties = self.mmc.getDevicePropertyNames(self._device_name) return list(properties) + own_settings def get_setting_type(self, setting_name): '''Returns "string", "integer" or "float". Returns an empty string if the setting does not exist. ''' if setting_name in self.settings: return 'float' try: num = self.mmc.getPropertyType(self._device_name, setting_name) except RuntimeError as e: print(f'Error! No setting named: {setting_name}') return '' if num == 1: return 'string' elif num == 2: return 'float' elif num == 3: return 'integer' def get_setting(self, setting_name): base = super().get_setting(setting_name) if base is not None: return base return self.mmc.getProperty(self._device_name, setting_name) def set_setting(self, setting_name, value): # a) Internal setting if super().set_setting(setting_name, value): return # b) Camera device setting type_name = self.get_setting_type(setting_name) if type_name == 'float': value = float(value) elif type_name == 'integer': value = int(value) elif type_name == '': print('Error! No setting named: {setting_name}') return print(f'Changing {setting_name} to its new value {value}') try: self.mmc.setProperty(self._device_name, setting_name, value) except RuntimeError as e: print('Error! The set value likely out of range.') print(e) def acquire_single(self, exposure_time, save, subdir): ''' Acquire a single image. save 'True' or 'False' subdir Subdirectory for saving ''' exposure_time = float(exposure_time) self.mmc.setExposure(exposure_time*1000) #binning = '2x2' #self.set_binning(binning) start_time = str(datetime.datetime.now()) self.mmc.snapImage() image = self.mmc.getImage() metadata = {'exposure_time_s': exposure_time, 'function': 'acquireSingle', 'start_time': start_time} self._post_acquire_single(image, metadata, save) def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir): ''' Acquire a series of images exposure_time How many seconds to expose each image image_interval How many seconds to wait in between the exposures N_frames How many images to take label Label for saving the images (part of the filename later) subdir ''' exposure_time = float(exposure_time) image_interval = float(image_interval) N_frames = int(N_frames) label = str(label) print("Now aquire_series with label " + label) print("- IMAGING PARAMETERS -") print(" exposure time " + str(exposure_time) + " seconds") print(" image interval " + str(image_interval) + " seconds") print(" N_frames " + str(N_frames)) print("- CAMERA SETTINGS") #self.set_binning('2x2') #print(" Pixel binning 2x2") device_name = self.mmc.getDeviceName(self._device_name) print(f'Device name {device_name}') #if 'hamamatsu' in device_name.lower(): # if trigger_direction == 'send': # print(" Camera sending a trigger pulse") # self.mmc.setProperty(self._device_name, "OUTPUT TRIGGER KIND[0]","EXPOSURE") # self.mmc.setProperty(self._device_name, "OUTPUT TRIGGER POLARITY[0]","NEGATIVE") # elif trigger_direction== 'receive': # print(" Camera recieving / waiting for a trigger pulse") # self.mmc.setProperty(self._device_name, "TRIGGER SOURCE","EXTERNAL") # self.mmc.setProperty(self._device_name, "TriggerPolarity","POSITIVE") # elif trigger_direction == 'none': # pass # else: # raise ValueError('trigger_direction has to be send, receive or none, not {trigger_direction}') print("Circular buffer " + str(self.mmc.getCircularBufferMemoryFootprint()) + " MB") scaler = float(self.settings['exposure_time_scaler']) exposure = scaler*exposure_time*1000 self.mmc.setExposure(exposure) self.mmc.clearCircularBuffer() #self.mmc.prepareSequenceAcquisition(self._device_name) self.wait_for_client() start_time = str(datetime.datetime.now()) self.mmc.startSequenceAcquisition(N_frames, image_interval+(1-scaler)*exposure, False) while self.mmc.isSequenceRunning(): self.mmc.sleep(1000*exposure_time) self.mmc.sleep(1000) images = [] for i in range(N_frames): while True: try: image = self.mmc.popNextImage() break except: # Index error for example when circular buffer is still empty self.mmc.sleep(1000*exposure_time) images.append(image) metadata = {'exposure_time_s': exposure_time, 'image_interval_s': image_interval, 'N_frames': N_frames, 'label': label, 'function': 'acquireSeries', 'start_time': start_time} metadata.update(self.settings) self._post_acquire_series(images, metadata, label) #if 'hamamatsu' in device_name.lower() and trigger_direction == 'receive': # self.mmc.setProperty(self._device_name, "TRIGGER SOURCE","INTERNAL") print('acquired') def set_binning(self, binning): ''' Binning '2x2' for example. ''' if not self.settings['binning'] == binning: self.mmc.setProperty(self._device_name, 'Binning', binning) super().set_binning(binning) def set_roi(self, x,y,w,h): ''' In binned pixels roi (x,y,w,h) or None ''' super().set_roi(x,y,w,h) x = int(x) y = int(y) w = int(w) h = int(h) if w == 0 or h==0: self.mmc.clearROI() else: self.mmc.setROI(x,y,w,h)
Ancestors
Methods
def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir)
-
Acquire a series of images
exposure_time How many seconds to expose each image image_interval How many seconds to wait in between the exposures N_frames How many images to take label Label for saving the images (part of the filename later) subdir
def acquire_single(self, exposure_time, save, subdir)
-
Acquire a single image.
save 'True' or 'False' subdir Subdirectory for saving
def get_camera(self)
-
Returns the label of the current camera.
def get_cameras(self)
-
Lists available MicroManager configuration files in .
def get_setting(self, setting_name)
def get_setting_type(self, setting_name)
-
Returns "string", "integer" or "float".
Returns an empty string if the setting does not exist.
def get_settings(self)
-
Returns device property names
def set_camera(self, name)
-
Set the provided configuration file.
def set_setting(self, setting_name, value)
Inherited members