Jump to content
  • 0

Request for help with long term fast data aquisition for USB-205


Question

Posted

Hi,

I need to run the data acquisition using USB-205 at 10kS/s for three analog input channels for at least 3 hours or longer while having the first column as date and time similar to the attached code which doesn't use hardware paced. Could you please help as I am not sure about the buffer allocation, etc. and certainly I am using a normal laptop having 16 Gb of RAM, so not sure if there is a way to record high resolution data without crashing the laptop?

Many thanks,

B

Digilent_DAQ_B_control.py

1 answer to this question

Recommended Posts

  • 0
Posted

Hello, 

The following Python for Windows program demonstrates how to collect data with a USB-205. The while...loop that reads the data controls how long it will run. It does not write to a file but it does print one row of data from each buffer read - this is done for simplicity. Use your Python skills to modify it adding a routine that can write a List along with a timestamp to the file. Do take note, there is very little memory used because you do not need a bigger buffer to continuously read data. Copy this example (attached) to the Console example program folder. 

best regards,

John

 

 

from __future__ import absolute_import, division, print_function

import time

from builtins import *  # @UnusedWildImport
from time import sleep
from ctypes import cast, POINTER, c_double, c_ushort, c_ulong

from mcculw import ul
from mcculw.enums import ScanOptions, FunctionType, Status
from mcculw.ul import ULError, a_input_mode
from mcculw.enums import InterfaceType
from mcculw.enums import ULRange
from mcculw.enums import AnalogInputMode

use_device_detection = True


def run_example():
    board_num = 0
    board_index = 0
    find_device = "USB-205"
    if use_device_detection:
        board_num = -1
        ul.ignore_instacal()
        dev_list = ul.get_daq_device_inventory(InterfaceType.USB)
        if len(dev_list) > 0:
            for device in dev_list:
                if str(device) == find_device:
                    print(f"Found {find_device} board number = {board_index}")
                    print(f"Serial number: {device.unique_id}")
                    print(f"Product type: {hex(device.product_id)}")
                    board_num = board_index
                    ul.create_daq_device(board_num, device)
                board_index = board_index + 1

            if board_num == -1:
                print(f"Device {find_device} not found")
                return
        else:
            print("No devices detected")
            return
    # **********End of Discovery************

    rate = 10000
    # buffer sized to hold one second of data. Make larger if your data handling requires more.
    points_per_channel = 10000
    low_chan = 0
    high_chan = 3
    num_chans = 4

    total_count = points_per_channel * num_chans
    half_count = int(total_count / 2)
    # The SCALEDATA option, returns volts instead of A/D counts
    scan_options = ScanOptions.CONTINUOUS | ScanOptions.BACKGROUND | ScanOptions.SCALEDATA

    memhandle = ul.scaled_win_buf_alloc(total_count)
    buf_data = cast(memhandle, POINTER(c_double))

    # Check if the buffer was successfully allocated
    if not memhandle:
        print("Failed to allocate memory.")
        return

    try:
        # Start the scan
        ul.a_in_scan(
            board_num, low_chan, high_chan, total_count,
            rate, ULRange.BIP10VOLTS, memhandle, scan_options)

        # Create a format string that aligns the data in columns
        # plus two for curr_index and curr_count
        row_format = "{:8}" * (num_chans + 3)

        # Print the channel name headers
        labels = []
        for ch_num in range(low_chan, high_chan + 1):
            labels.append("CH" + str(ch_num) + "\t")

        labels.append("index\t")
        labels.append("count\t")
        labels.append("diff")
        print(row_format.format(*labels))

        # boolean flag used to toggle reading upper and lower buffer
        read_lower = True
        # Start updating the displayed values
        status, curr_count, curr_index = ul.get_status(
            board_num, FunctionType.AIFUNCTION)

        last = 0
        diff = 0
        while status != Status.IDLE and curr_count < 200000:
            # Make sure a data point is available for display.
            if curr_count > 0:
                # curr_index points to the start of the last completed
                # channel scan that was transferred between the board and
                # the data buffer. Display the latest value for each
                # channel.

                # display_data = []
                if (curr_index > half_count) and (read_lower == True):
                    diff = curr_count - last
                    ul.scaled_win_buf_to_array(memhandle, buf_data, 0, int(half_count))
                    print(
                        '{:.3f}\t {:.3f}\t {:.3f}\t {:.3f}\t {:d}\t {:d}'.format(buf_data[0], buf_data[1],
                                                                                        buf_data[2],
                                                                                        buf_data[3],
                                                                                        curr_index,
                                                                                        curr_count))
                    read_lower = False

                elif (curr_index < half_count) and (read_lower == False):
                    diff = curr_count - last
                    ul.scaled_win_buf_to_array(memhandle, buf_data, int(half_count), int(half_count))
                    print(
                        '{:.3f}\t {:.3f}\t {:.3f}\t {:.3f}\t {:d}\t {:d}'.format(buf_data[0], buf_data[1],
                                                                                        buf_data[2],
                                                                                        buf_data[3],
                                                                                        curr_index,
                                                                                        curr_count))
                    read_lower = True
            sleep(0.1)
            status, curr_count, curr_index = ul.get_status(board_num, FunctionType.AIFUNCTION)

        # Stop the background operation (this is required even if the
        # scan completes successfully)
        ul.stop_background(board_num, FunctionType.AIFUNCTION)

        print("Scan completed successfully.")
    except ULError as e:
        print(e.message)
    finally:
        # Free the buffer in a finally block to prevent errors from causing
        # a memory leak.
        ul.win_buf_free(memhandle)

        if use_device_detection:
            ul.release_daq_device(board_num)


if __name__ == '__main__':
    run_example()
 

a_in_scan_USB200.py

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×
×
  • Create New...