Commit 3f604e76 authored by Yuelong Yu's avatar Yuelong Yu
Browse files

finished data receiver task

parent 45142c48
......@@ -24,15 +24,18 @@
#include <fsdetector/core/Globals.h>
#include <fsdetector/core/ThreadUtils.h>
#include <fsdetector/core/MemUtils.h>
#include <boost/asio.hpp>
#include <fsdetector/core/NetworkInterface.h>
#include <fsdetector/core/NetworkImplementation.h>
namespace FSDataRecvNS
{
using namespace FSDetCoreNS;
using boost::asio::ip::udp;
//using boost::asio::ip::udp;
typedef unique_ptr<ThreadPool> uptr_threads;
typedef shared_ptr<MemPool<int16>> sptr_mem16bit;
typedef unique_ptr<NetworkInterface> uptr_network;
const int16 CHAR_BUFFER = 9000;
}
......@@ -38,6 +38,8 @@ namespace FSDataRecvNS
m_buffer_length = buffer_length;
m_live_image = shared_ptr<int32>(new int32[m_img_size]);
m_mempool = sptr_mem16bit(new MemPool<int16>(buffer_length,m_img_size));
//create 6 threads
m_threadpool = uptr_threads(new ThreadPool(6));
......@@ -78,12 +80,7 @@ namespace FSDataRecvNS
m_receiver_task->SetRequestFrames(m_requested_frames);
m_filesaving_task->SetRequestFrames(m_requested_frames);
}
void DataReceiver::SetFramesPerFile(int32 frames_per_file)
{
m_frames_per_file = frames_per_file;
}
int32 DataReceiver::GetReceivedImages()
{
if(m_save_file)
......@@ -101,21 +98,21 @@ namespace FSDataRecvNS
{
m_save_file = save_data;
}
void DataReceiver::SetFileParameter(string file_name,
bool compression_enabled,
bool shuffle_enabled,
int16 compression_ratio,
bool file_split,
int32 frames_per_file)
void DataReceiver::SetFileName(string file_name)
{
m_file_name = file_name;
m_compression_enabled = compression_enabled;
m_shuffle_enabled = shuffle_enabled;
m_compression_ratio = compression_ratio;
m_file_split = file_split;
}
void DataReceiver::SetFramesPerFile(int32 frames_per_file)
{
m_frames_per_file = frames_per_file;
}
void DataReceiver::SetFileStructureWithXML(string file_structure)
{
if(m_save_file)
m_filesaving_task->SetFileStructureWithXML(file_structure);
}
Enum_detector_state DataReceiver::GetStatus()
{
......
......@@ -60,14 +60,7 @@ namespace FSDataRecvNS
* @param frame_number frame numbers
*/
void SetFrameNumbers(int32 frame_number);
/**
* @brief set frame per file
* @param frames_per_file frames per file.
* After specific numbers, the file is splitted.
*/
void SetFramesPerFile(int32 frames_per_file);
/**
* @brief get received images
* @return received images
......@@ -88,33 +81,23 @@ namespace FSDataRecvNS
void SetFileSaving(bool save_data);
/**
* @brief set file parameters
* @brief set file name
* @param file_name file name
* @param compression_enabled true: compression is enabled
* @param shuffle_enabled true: data shuffle is enabled during compression
* @param compression_ratio compression ration from 0-9
* @param file_split not implemented
* @param frames_per_file not implemented
*/
void SetFileParameter(string file_name,
bool compression_enabled,
bool shuffle_enabled,
int16 compression_ratio,
bool file_split,
int32 frames_per_file);
void SetFileName(string file_name);
/**
* @brief set frame per file
* @param frames_per_file frames per file.
* After specific numbers, the file is splitted.
*/
void SetFramesPerFile(int32 frames_per_file);
/**
* @brief set file structur with xml
* @param XML structure for Nexus file
*/
void SetFileStructureWithXML(string file_structure);
/**
* @brief set file name
* @param file_name file name
*/
void SetFileName(string file_name);
/**
* @brief get status of detector
......
......@@ -29,9 +29,6 @@ namespace FSDataRecvNS
int32 img_size,
sptr_mem16bit const& mempool)
:m_mempool16bit(mempool),
m_io(),
m_end_point(boost::asio::ip::address::from_string(udp_ip), port),
m_udp(m_io,m_end_point),
m_img_size(img_size),
m_received_imgs(0),
m_packets_collected(0),
......@@ -39,25 +36,19 @@ namespace FSDataRecvNS
m_img_data(img_size,0)
{
LOG_TRACE(__FUNCTION__);
boost::asio::socket_base::receive_buffer_size option;
m_udp.get_option(option);
uint32 size = option.value();
size *= 10;
option = boost::asio::socket_base::receive_buffer_size(size);
m_udp.set_option(option);
//m_udp.get_option(option);
m_udpconn = uptr_network(new NetworkUDPInterface(new NetworkUDPImplementation(udp_ip,port)));
//cout<<"size"<<size<<"-"<<option.value()<<endl;
//m_packet = shared_ptr<char>(new char[CHAR_BUFFER]);
if(m_udpconn->Connect() != 0)
LOG_STREAM(__FUNCTION__,ERROR,"cannot bind to udp socket" + udp_ip + to_string(port));
}
ReceiverTask::~ReceiverTask()
{
LOG_TRACE(__FUNCTION__);
m_udpconn.reset();
}
void ReceiverTask::SetRequestFrames(int32 requested_frames)
......@@ -82,11 +73,43 @@ namespace FSDataRecvNS
void ReceiverTask::DoTaskAction()
{
LOG_TRACE(__FUNCTION__);
int32 frame_no;
int16 error_code;
szt packet_size;
while(1)
{
usleep(100);
boost::unique_lock<boost::mutex> lock(m_bstSync);
if(!m_fStart)
{
m_udpconn->ClearDataInSocket();
m_bstCond.wait(lock);
}
if(m_fExit)
break;
lock.unlock();
if(m_mempool16bit->GetTotalReceivedFrames() >= m_requested_frames)
{
Stop();
continue;
}
error_code = m_udpconn->ReceivePacket((char*)(&m_packet),packet_size);
if(error_code != -1)
BuildImage(m_packet);
}
}
void ReceiverTask::BuildImage(packet_struct const& packet_data)
{
LOG_TRACE(__FUNCTION__);
++m_received_imgs;
}
}
......@@ -47,34 +47,63 @@ namespace FSDataRecvNS
};
#pragma pack(pop)
const uint16_t CHAR_BUFFER = 9000;
class ReceiverTask:public Task
{
public:
/**
* @brief constructor
* @param udp_ip ip address
* @param port port
* @param img_size imge size
* @param mempool img buffer
*/
ReceiverTask(string udp_ip,
int16 port,
int32 img_size,
sptr_mem16bit const& mempool);
/**
* @brief destructor
*/
~ReceiverTask();
/**
* @brief set expected frames for the acquisition
* @param requested_frames expected frames
*/
void SetRequestFrames(int32 requested_frames);
/**
* @brief get received images
* @return received images
*/
int32 GetReceivedImages();
/**
* @brief get live image
* @param img_data image data
* @param frame_number frame No.
* @param error_code
*/
void GetLiveImageData(shared_ptr<int32> const& img_data,
int32& frame_number,
int16& error_code);
/**
* @brief receive data
*/
void DoTaskAction();
private:
/**
* @brief build image from UDP packet
* @param packet UDP packet
*/
void BuildImage(packet_struct const& packet_data);
sptr_mem16bit m_mempool16bit;
uptr_network m_udpconn;
boost::asio::io_service m_io;
udp::endpoint m_end_point;
udp::socket m_udp;
int64 m_current_frame_number;
int32 m_img_size,m_received_imgs,m_requested_frames,m_packets_collected;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment