Module gonioimsoft.camera_server
The camera server for using the camera and save images.
Functionality
- Image acquisition using Micro-Manager's Python bindings (pymmcore)
- Live feed using matplotlib
- File saving using the tifffile module
Background
On Windows, MM builds came precompiled with Python 2 support only. The camera server/client division here made it possible to run the server calling MM using Python 2 (to control the camera and image saving) and then the camera client run with Python 3.
It is still usefull; Because it uses (network) sockets, it is trivial to use another (or multiple PC) to acqurie images. Or, in future, to use another backends than MM.
Functions
def main()def test_camera()
Classes
class CameraBase-
Base class for all cameras
Expand source code
class CameraBase: '''Base class for all cameras ''' def __init__(self): self.settings = { 'exposure_time_scaler': 1, 'roi': None, 'binning': None} self.live_queue= False self.shower = ImageShower() # Description file string self.description_string = '' self.save_stack = False self.save_directory = None self._startdir = os.getcwd() self.title = 'Camera not set' self.servertitle = '' def get_cameras(self): raise NotImplementedError def get_camera(self): raise NotImplementedError def set_camera(self): raise NotImplementedError def get_settings(self): excl = ['roi', 'binning'] return [s for s in list(self.settings.keys()) if s not in excl] def get_setting_type(self, setting_name): raise NotImplementedError def get_setting(self, setting_name): if setting_name in self.settings: return self.settings[setting_name] return '' def set_setting(self, setting_name, value): # a) Internal setting if setting_name in self.settings: self.settings[setting_name] = value return True return False def acquire_single(self, exposure_time, save, subdir): raise NotImplementedError def _post_acquire_single(self, image, metadata, save): if not self.live_queue: self.live_queue = multiprocessing.Queue() self.live_queue.put( image[0::LIVE_DOWNSAMPLE, 0::LIVE_DOWNSAMPLE]) self.livep = multiprocessing.Process( target=self.shower.loop, args=(self.live_queue,self.title)) self.livep.start() self.live_queue.put(image[0::LIVE_DOWNSAMPLE, 0::LIVE_DOWNSAMPLE]) if save == 'True': save_thread = threading.Thread(target=self.save_images,args=([image],'snap_{}'.format(start_time.replace(':','.').replace(' ','_')), metadata,os.path.join(self.save_directory, subdir))) save_thread.start() def _post_acquire_series(self, images, metadata, label): save_thread = threading.Thread(target=self.save_images, args=(images,label,metadata,os.path.join(self.save_directory, subdir))) save_thread.start() def save_images(self, images, label, metadata, savedir): ''' Save given images as grayscale tiff images. ''' savedir = os.path.join(self._startdir, savedir) if not os.path.isdir(savedir): try: os.makedirs(savedir) except: # May fail if many local servers creating # the folders simultaneously pass if self.save_stack == False: # Save separate images for i, image in enumerate(images): fn = '{}_{}.tiff'.format(label, i) tifffile.imwrite(os.path.join(savedir, fn), image, metadata=metadata) else: # Save a stack fn = '{}_stack.tiff'.format(label) tifffile.imwrite(os.path.join(savedir, fn), np.asarray(images), metadata=metadata) self.save_description(os.path.join(savedir, 'description'), self.description_string, internal=True) def set_save_stack(self, boolean): ''' If boolean == "True", save images as stacks instead of separate images. ''' if boolean == 'True': self.save_stack = True elif boolean == 'False': self.save_stack = False else: print("Did not understand wheter to save stacks. Given {}".format(boolean)) def set_binning(self, binning): ''' Binning '2x2' for example. ''' self.settings['binning'] = binning def set_roi(self, x,y,w,h): ''' In binned pixels roi (x,y,w,h) or None ''' x = int(x) y = int(y) w = int(w) h = int(h) if w == 0 or h==0: self.settings['roi'] = None else: self.settings['roi'] = [x,y,w,h] def save_description(self, specimen_name, desc_string, internal=False): ''' Allows saving a small descriptive text file into the main saving directory. Filename should be the same as the folder where it's saved. Appends to the previous file. specimen_name DrosoM42 for example, name of the specimen folder desc_string String, what to write in the file internal If true, specimen_name becomes filename of the file ''' if internal: fn = specimen_name else: fn = os.path.join(self.save_directory, specimen_name, specimen_name) # Check if the folder exists if not os.path.exists(os.path.dirname(fn)): try: os.makedirs(os.path.dirname(fn), exist_ok=True) except: # My fail if many local servers. Let's not # care about it, another server has made it pass try: with open(fn+'.txt', 'w') as fp: fp.write(desc_string) print("Wrote file " + fn+'.txt') except: pass self.description_string = desc_string def close(self): if self.live_queue: self.live_queue.put('close') def wait_for_client(self): passSubclasses
Methods
def acquire_single(self, exposure_time, save, subdir)def close(self)def get_camera(self)def get_cameras(self)def get_setting(self, setting_name)def get_setting_type(self, setting_name)def get_settings(self)def save_description(self, specimen_name, desc_string, internal=False)-
Allows saving a small descriptive text file into the main saving directory. Filename should be the same as the folder where it's saved.
Appends to the previous file.
specimen_name DrosoM42 for example, name of the specimen folder desc_string String, what to write in the file internal If true, specimen_name becomes filename of the file
def save_images(self, images, label, metadata, savedir)-
Save given images as grayscale tiff images.
def set_binning(self, binning)-
Binning '2x2' for example.
def set_camera(self)def set_roi(self, x, y, w, h)-
In binned pixels roi (x,y,w,h) or None
def set_save_stack(self, boolean)-
If boolean == "True", save images as stacks instead of separate images.
def set_setting(self, setting_name, value)def wait_for_client(self)
class CameraServer (camera, port=None)-
Camera server listens incoming connections from the client and controls a camera class.
Expand source code
class CameraServer(ServerBase): '''Camera server listens incoming connections from the client and controls a camera class. ''' def __init__(self, camera, port=None): if port is None: port = CAMERA_PORT super().__init__('', port, camera) print(f'Using the camera <{camera.__class__.__name__}>') self.cam = self.device self.cam.servertitle = f'Server on port {port}' self.cam.wait_for_client = self.wait_for_client added_functions = {'acquireSeries': self.cam.acquire_series, 'acquireSingle': self.cam.acquire_single, 'saveDescription': self.cam.save_description, 'set_roi': self.cam.set_roi, 'set_save_stack': self.cam.set_save_stack, 'get_cameras': self.cam.get_cameras, 'get_camera': self.cam.get_camera, 'set_camera': self.cam.set_camera, 'get_settings': self.cam.get_settings, 'get_setting_type': self.cam.get_setting_type, 'get_setting': self.cam.get_setting, 'set_setting': self.cam.set_setting, } self.functions = {**self.functions, **added_functions} self.responders.extend( ['get_cameras', 'get_camera', 'get_settings', 'get_setting_type', 'get_setting'] )Ancestors
Inherited members
class DummyCamera-
A dummy camera suitable for testing the server/client.
Expand source code
class DummyCamera(CameraBase): '''A dummy camera suitable for testing the server/client. ''' def __init__(self): super().__init__() self.settings = {'setting1' : 'na', 'setting2': 0.0, 'setting3': 1} self.camera = None def acquire_single(self, exposure_time, save, subdir): start_time = str(datetime.datetime.now()) image = np.random.randint(0,2**10-1, (512,512)) metadata = {'exposure_time_s': exposure_time, 'function': 'acquireSingle-dummy', 'start_time': start_time} self._post_acquire_single(image, metadata, save) def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir): start_time = str(datetime.datetime.now()) images = [np.random.randint(0,2**10-1, (512,512)) for i in range(10)] metadata = {'exposure_time_s': exposure_time, 'image_interval_s': image_interval, 'N_frames': N_frames, 'label': label, 'function': 'acquireSeries-dummy', 'start_time': start_time} metadata.update(self.settings) self._post_acquire_series(images, metadata, label) def get_cameras(self): return ['dummy1', 'dummy2'] def get_camera(self): return self.camera def set_camera(self, name): self.camera = name def get_setting_type(self, setting_name): base = super().get_setting_type() if base: return base elif setting_name == 'setting1': return 'string' elif setting_name == 'setting2': return 'float' elif setting_name == 'setting3': return 'integer' else: print('Invalid setting') return ''Ancestors
Methods
def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir)def acquire_single(self, exposure_time, save, subdir)def get_camera(self)def get_cameras(self)def get_setting_type(self, setting_name)def set_camera(self, name)
Inherited members
class ImageShower-
Shows images on the screen in its own window.
In future, may be used to select ROIs as well to allow higher frame rate imaging / less data.
Working Principle
Image shower works so that self.loop is started as a separate process using multiprocessing library
Methods
self.loop Set this as multiprocessing target
Expand source code
class ImageShower: '''Shows images on the screen in its own window. In future, may be used to select ROIs as well to allow higher frame rate imaging / less data. ------------------ Working principle ------------------ Image shower works so that self.loop is started as a separate process using multiprocessing library ------- Methods ------- self.loop Set this as multiprocessing target ''' def __init__(self): self.fig = plt.figure() self.ax = self.fig.add_subplot(111) self.close = False #self.cid = self.fig.canvas.mpl_connect('key_press_event', self.callbackButtonPressed) self.image_brightness = 0 self.image_maxval = 1 self.selection = None self.image_size = None def callbackButtonPressed(self, event): if event.key == 'r': self.image_maxval -= 0.05 self._updateImage(strong=True) elif event.key == 't': self.image_maxval += 0.05 self._updateImage(strong=True) def __onSelectRectangle(self, eclick, erelease): # Get selection box coordinates and set the box inactive x1, y1 = eclick.xdata, eclick.ydata x2, y2 = erelease.xdata, erelease.ydata #self.rectangle.set_active(False) x = int(min((x1, x2))) y = int(min((y1, y2))) width = int(abs(x2-x1)) height = int(abs(y2-y1)) self.selection = [x, y, width, height] def _updateImage(self, i): data = None while not self.queue.empty(): # Get the latest image in the queue data = self.queue.get(True, timeout=0.01) if data is None: return self.im, '' elif isinstance(data, str) and data == 'close': self.close = True return self.im, '' if self.selection and data.size != self.image_size: self.selection = None if self.selection: x,y,w,h = self.selection if w<1 or h<1: # If selection box empty (accidental click on the image) # use the whole image instead inspect_area = data else: inspect_area = data[y:y+h, x:x+w] else: inspect_area = data per95 = np.percentile(inspect_area, 95) per5 = np.percentile(inspect_area, 5) data = np.clip(data, per5, per95) data -= per5 data /= (per95-per5) self.image_size = data.size self.im.set_array(data) self.fig.suptitle('Selection 95th percentile: {}'.format(per95), fontsize=10) text = '' return self.im, text def loop(self, queue, title): ''' Runs the ImageShower by reading images from the given queue. Set this as a multiprocessing target. queue Multiprocessing queue with a get method. ''' self.queue = queue self.rectangle = RectangleSelector(self.ax, self.__onSelectRectangle, useblit=True) image = queue.get() self.im = plt.imshow(1000*image/np.max(image), cmap='gray', vmin=0, vmax=1, interpolation='none', aspect='auto') self.ani = FuncAnimation(plt.gcf(), self._updateImage, frames=range(100), interval=50, blit=False) self.fig.canvas.toolbar.winfo_toplevel().title(title) # Remove the toolbar; Gives more space when having many cameras self.fig.canvas.toolbar.pack_forget() # Take away all white space plt.subplots_adjust(top=1, bottom=0, right=1,left=0, hspace=0, wspace=0) plt.show(block=True)Methods
def callbackButtonPressed(self, event)def loop(self, queue, title)-
Runs the ImageShower by reading images from the given queue. Set this as a multiprocessing target.
queue Multiprocessing queue with a get method.
class MMCamera-
Controls any camera using MicroManager and its pymmcore bindings.
Expand source code
class MMCamera(CameraBase): '''Controls any camera using MicroManager and its pymmcore bindings. ''' def __init__(self): super().__init__() self.mmc = pymmcore.CMMCore() self.mmc.setDeviceAdapterSearchPaths([DEFAULT_MICROMANAGER_DIR]) self._device_name = None self._configuration_name = '' #self.mmc.loadDevice('Camera', 'HamamatsuHam', 'HamamatsuHam_DCAM') #self.mmc.initializeAllDevices() #self.mmc.setCameraDevice('Camera') #self.mmc.setCircularBufferMemoryFootprint(4000) def get_cameras(self): '''Lists available MicroManager configuration files in . ''' return [fn for fn in os.listdir(DEFAULT_MICROMANAGER_DIR) if fn.endswith('.cfg')] def get_camera(self): '''Returns the label of the current camera. ''' return self._configuration_name def set_camera(self, name): '''Set the provided configuration file. ''' if not os.path.exists(name): path = os.path.join(DEFAULT_MICROMANAGER_DIR, name) if not os.path.exists(path): print(f'Couldnt open configuration file named: {path}') return self.mmc.loadSystemConfiguration(path) self._device_name = self.mmc.getCameraDevice() self._configuration_name = name self.mmc.prepareSequenceAcquisition(self._device_name) self.title = f'{name} ({self._device_name}) | {self.servertitle}' def get_settings(self): '''Returns device property names ''' own_settings = super().get_settings() if self._device_name is None: return '' properties = self.mmc.getDevicePropertyNames(self._device_name) return list(properties) + own_settings def get_setting_type(self, setting_name): '''Returns "string", "integer" or "float". Returns an empty string if the setting does not exist. ''' if setting_name in self.settings: return 'float' try: num = self.mmc.getPropertyType(self._device_name, setting_name) except RuntimeError as e: print(f'Error! No setting named: {setting_name}') return '' if num == 1: return 'string' elif num == 2: return 'float' elif num == 3: return 'integer' def get_setting(self, setting_name): base = super().get_setting(setting_name) if base is not None: return base return self.mmc.getProperty(self._device_name, setting_name) def set_setting(self, setting_name, value): # a) Internal setting if super().set_setting(setting_name, value): return # b) Camera device setting type_name = self.get_setting_type(setting_name) if type_name == 'float': value = float(value) elif type_name == 'integer': value = int(value) elif type_name == '': print('Error! No setting named: {setting_name}') return print(f'Changing {setting_name} to its new value {value}') try: self.mmc.setProperty(self._device_name, setting_name, value) except RuntimeError as e: print('Error! The set value likely out of range.') print(e) def acquire_single(self, exposure_time, save, subdir): ''' Acquire a single image. save 'True' or 'False' subdir Subdirectory for saving ''' exposure_time = float(exposure_time) self.mmc.setExposure(exposure_time*1000) #binning = '2x2' #self.set_binning(binning) start_time = str(datetime.datetime.now()) self.mmc.snapImage() image = self.mmc.getImage() metadata = {'exposure_time_s': exposure_time, 'function': 'acquireSingle', 'start_time': start_time} self._post_acquire_single(image, metadata, save) def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir): ''' Acquire a series of images exposure_time How many seconds to expose each image image_interval How many seconds to wait in between the exposures N_frames How many images to take label Label for saving the images (part of the filename later) subdir ''' exposure_time = float(exposure_time) image_interval = float(image_interval) N_frames = int(N_frames) label = str(label) print("Now aquire_series with label " + label) print("- IMAGING PARAMETERS -") print(" exposure time " + str(exposure_time) + " seconds") print(" image interval " + str(image_interval) + " seconds") print(" N_frames " + str(N_frames)) print("- CAMERA SETTINGS") #self.set_binning('2x2') #print(" Pixel binning 2x2") device_name = self.mmc.getDeviceName(self._device_name) print(f'Device name {device_name}') #if 'hamamatsu' in device_name.lower(): # if trigger_direction == 'send': # print(" Camera sending a trigger pulse") # self.mmc.setProperty(self._device_name, "OUTPUT TRIGGER KIND[0]","EXPOSURE") # self.mmc.setProperty(self._device_name, "OUTPUT TRIGGER POLARITY[0]","NEGATIVE") # elif trigger_direction== 'receive': # print(" Camera recieving / waiting for a trigger pulse") # self.mmc.setProperty(self._device_name, "TRIGGER SOURCE","EXTERNAL") # self.mmc.setProperty(self._device_name, "TriggerPolarity","POSITIVE") # elif trigger_direction == 'none': # pass # else: # raise ValueError('trigger_direction has to be send, receive or none, not {trigger_direction}') print("Circular buffer " + str(self.mmc.getCircularBufferMemoryFootprint()) + " MB") scaler = float(self.settings['exposure_time_scaler']) exposure = scaler*exposure_time*1000 self.mmc.setExposure(exposure) self.mmc.clearCircularBuffer() #self.mmc.prepareSequenceAcquisition(self._device_name) self.wait_for_client() start_time = str(datetime.datetime.now()) self.mmc.startSequenceAcquisition(N_frames, image_interval+(1-scaler)*exposure, False) while self.mmc.isSequenceRunning(): self.mmc.sleep(1000*exposure_time) self.mmc.sleep(1000) images = [] for i in range(N_frames): while True: try: image = self.mmc.popNextImage() break except: # Index error for example when circular buffer is still empty self.mmc.sleep(1000*exposure_time) images.append(image) metadata = {'exposure_time_s': exposure_time, 'image_interval_s': image_interval, 'N_frames': N_frames, 'label': label, 'function': 'acquireSeries', 'start_time': start_time} metadata.update(self.settings) self._post_acquire_series(images, metadata, label) #if 'hamamatsu' in device_name.lower() and trigger_direction == 'receive': # self.mmc.setProperty(self._device_name, "TRIGGER SOURCE","INTERNAL") print('acquired') def set_binning(self, binning): ''' Binning '2x2' for example. ''' if not self.settings['binning'] == binning: self.mmc.setProperty(self._device_name, 'Binning', binning) super().set_binning(binning) def set_roi(self, x,y,w,h): ''' In binned pixels roi (x,y,w,h) or None ''' super().set_roi(x,y,w,h) x = int(x) y = int(y) w = int(w) h = int(h) if w == 0 or h==0: self.mmc.clearROI() else: self.mmc.setROI(x,y,w,h)Ancestors
Methods
def acquire_series(self, exposure_time, image_interval, N_frames, label, subdir)-
Acquire a series of images
exposure_time How many seconds to expose each image image_interval How many seconds to wait in between the exposures N_frames How many images to take label Label for saving the images (part of the filename later) subdir
def acquire_single(self, exposure_time, save, subdir)-
Acquire a single image.
save 'True' or 'False' subdir Subdirectory for saving
def get_camera(self)-
Returns the label of the current camera.
def get_cameras(self)-
Lists available MicroManager configuration files in .
def get_setting(self, setting_name)def get_setting_type(self, setting_name)-
Returns "string", "integer" or "float".
Returns an empty string if the setting does not exist.
def get_settings(self)-
Returns device property names
def set_camera(self, name)-
Set the provided configuration file.
def set_setting(self, setting_name, value)
Inherited members