Commit 9cc922de authored by Yuelong Yu's avatar Yuelong Yu
Browse files

train builder format test

- fixed train builder format issues
- added example to test udp packages
- added digital and analog live image
parent 88fc421d
......@@ -2,4 +2,5 @@
build
bin
.DS_store
.vscode
// author : Yuelong Yu
// compilation : g++ -g --std=c++11 train_format_test.cpp -o test -lboost_system
#include <iostream>
#include <cinttypes>
#include <string>
#include <boost/asio.hpp>
using boost::asio::ip::udp;
using namespace std;
int main(int argc, char* argv[])
{
if(argc != 3)
cout<<"Please input the IP address and port number."<<endl
<<"Example: $./udpclient 127.0.0.1 1234"<<endl;
else
{
string ip = string(argv[1]);
int32_t port = atol(argv[2]);
cout<<"Listening.."<<endl;
cout<<"IP:"<<ip<<"\nPort:"<<port<<endl;
char chbuffer[10000]="";
boost::asio::io_service my_io_service;
// udp::endpoint endpoint(udp::v4(), 4321);
udp::endpoint endpoint(boost::asio::ip::address::from_string(ip), port);
udp::socket socket(my_io_service, endpoint);
int32_t rec = 0;
int32_t total_rec=0;
int32_t counter = 0;
int32_t image = 0;
int32_t frame_nr = 0;
uint32_t packet_nr = 0;
int32_t train_nr = 0;
bool sof = false;
bool eof = false;
uint64_t frame_nr_in_train = 0;
int32_t first_frame = 0;
string TRAIN_HEADER_CHECK="\x58\x54\x44\x46\xBE\xEF\xFA\xCE";
std::reverse(TRAIN_HEADER_CHECK.begin(),TRAIN_HEADER_CHECK.end());
while (true)
{
int32_t ret = socket.available();
if(ret > 0)
{
counter++;
//std::cout<<"counter:"<<counter<<" - length in bytes:"<<ret<<std::endl;
rec = socket.receive_from(boost::asio::buffer(chbuffer),endpoint);
if(std::string(chbuffer,8) == TRAIN_HEADER_CHECK)
{
train_nr++;
int32_t major_version = -1;
int32_t minor_version = -1;
memcpy(&major_version,chbuffer+8,sizeof(int32_t));
memcpy(&minor_version,chbuffer+12,sizeof(int32_t));
std::cout<<"==========\ntrain format\nnr is:"
<<train_nr
<<"version:"<<major_version<<"."<<minor_version
<<std::endl;
}
frame_nr = (((int32_t)chbuffer[rec-5])<<24)
+(((int32_t)chbuffer[rec-6])<<16)
+(((int32_t)chbuffer[rec-7])<<8)
+((int32_t)chbuffer[rec-8]);
packet_nr = (((int32_t)chbuffer[rec-1])<<24)
+(((int32_t)chbuffer[rec-2])<<16)
+(((int32_t)chbuffer[rec-3])<<8)
+((int32_t)chbuffer[rec-4]);
sof= ((((uint8_t)chbuffer[rec-5]) & 0x80) >> 7) & 0x1;
eof= ((((uint8_t)chbuffer[rec-5]) & 0x40) >> 6) & 0x1;
// if(sof)
// {
first_frame = frame_nr;
//get image numbers in the train
frame_nr_in_train = (((uint64_t)chbuffer[47])<<56)
+(((uint64_t)chbuffer[46])<<48)
+(((uint64_t)chbuffer[45])<<40)
+(((uint64_t)chbuffer[44])<<32)
+(((uint64_t)chbuffer[43])<<24)
+(((uint64_t)chbuffer[42])<<16)
+(((uint64_t)chbuffer[41])<<8)
+((uint64_t)chbuffer[40]);
std::cout<<"counter:"<<counter<<"||total_rec:"<<total_rec<<"||rec:"<<rec<<"||ret:"<<ret<<"||image:"<<image<<"||frame_nr"<<frame_nr<<"||packet_nr"<<packet_nr
<<"||sof:"<<sof
<<"||eof:"<<eof
<<"||frame_nr_in_train:"<<frame_nr_in_train<<std::endl;
total_rec+=rec;
if(rec==2056)
image++;
}
}
}
return 0;
}
\ No newline at end of file
......@@ -37,6 +37,7 @@ namespace FSDataRecvNS
typedef unique_ptr<NetworkInterface> uptr_network;
const int16 CHAR_BUFFER = 9000;
const int32 AGIPD_IMAGE_SIZE_IN_BYTES = 131072;
// train header format
const string TRAIN_HEADER_CHECK="\xCE\xFA\xEF\xBE\x46\x44\x54\x58";
......
......@@ -49,7 +49,8 @@ namespace FSDataRecvNS
m_buffer_length = buffer_length;
m_max_frame_numbers = m_buffer_length;
m_live_image = shared_ptr<int32>(new int32[m_img_size]);
m_live_image = shared_ptr<int16>(new int16[m_img_size]);
m_live_image_analog = shared_ptr<int16>(new int16[m_img_size]);
m_mempool = sptr_mem16bit(new MemPool<int16>(buffer_length,m_img_size));
......@@ -177,7 +178,7 @@ namespace FSDataRecvNS
{
LOG_TRACE(__FUNCTION__);
std::fill(m_live_image.get(),m_live_image.get()+m_img_size,0);
//std::fill(m_live_image.get(),m_live_image.get()+m_img_size,0);
m_status = BUSY;
m_mempool->Reset();
......@@ -208,14 +209,25 @@ namespace FSDataRecvNS
LOG_TRACE(__FUNCTION__);
m_receiver_task->SetLiveViewInterval(interval);
}
void DataReceiver::GetLiveImageData(shared_ptr<int32>& img_data,
void DataReceiver::GetLiveImageDataAnalog(shared_ptr<int16>& img_data,
int32& frame_number,
int16& error_code)
{
LOG_TRACE(__FUNCTION__);
m_receiver_task->GetLiveImageDataAnalog(m_live_image_analog,frame_number,error_code);
img_data = m_live_image_analog;
}
void DataReceiver::GetLiveImageDataDigital(shared_ptr<int16>& img_data,
int32& frame_number,
int16& error_code)
{
LOG_TRACE(__FUNCTION__);
m_receiver_task->GetLiveImageData(m_live_image,frame_number,error_code);
m_receiver_task->GetLiveImageDataDigital(m_live_image,frame_number,error_code);
img_data = m_live_image;
}
......
......@@ -139,7 +139,17 @@ namespace FSDataRecvNS
* @param frame_number frame number of live image
* @param error_code error code of live image
*/
void GetLiveImageData(shared_ptr<int32>& img_data,
void GetLiveImageDataAnalog(shared_ptr<int16>& img_data,
int32& frame_number,
int16& error_code);
/**
* @brief get live image data
* @param img_data data of live image
* @param frame_number frame number of live image
* @param error_code error code of live image
*/
void GetLiveImageDataDigital(shared_ptr<int16>& img_data,
int32& frame_number,
int16& error_code);
......@@ -163,7 +173,7 @@ namespace FSDataRecvNS
uptr_threads m_threadpool;
sptr_mem16bit m_mempool;
shared_ptr<int32> m_live_image;
shared_ptr<int16> m_live_image,m_live_image_analog;
static unique_ptr<DataReceiver> m_data_receiver;
ReceiverTask* m_receiver_task;
......
......@@ -154,6 +154,7 @@ namespace FSDataRecvNS
{
vector<int16> img_data(img,img+m_x*m_y);
LOG_INFOS("write data:" + to_string(frame_no));
WriteData(img_data,frame_no,error_code);
lock.lock();
......
......@@ -31,17 +31,22 @@ namespace FSDataRecvNS
int32 img_size,
sptr_mem16bit const& mempool)
:m_mempool16bit(mempool),
m_img_size(img_size),
m_received_imgs(0),
m_processed_frames_in_one_train(0),
m_frames_in_one_train(0),
m_img_size(img_size),
m_packets_collected(0),
m_x(x),
m_y(y),
m_liveviewinterval(-1),
m_collected_bytes(0),
m_processing_frame_nr(-1),
m_packet(),
m_img_data(img_size,0),
m_reordered_img(img_size,0),
m_liveimg(img_size,0)
m_liveimg(img_size,0),
m_liveimg_analog(img_size,0),
m_train_format(false)
{
LOG_TRACE(__FUNCTION__);
......@@ -80,7 +85,22 @@ namespace FSDataRecvNS
m_liveviewinterval = interval;
}
void ReceiverTask::GetLiveImageData(shared_ptr<int32> const& img_data,
void ReceiverTask::GetLiveImageDataAnalog(shared_ptr<int16> const& img_data,
int32& frame_number,
int16& error_code)
{
LOG_TRACE(__FUNCTION__);
boost::unique_lock<boost::mutex> lock(m_bstSync);
//img_data = m_liveimg;
std::copy(m_liveimg_analog.begin(),m_liveimg_analog.end(),img_data.get());
frame_number = m_current_frame_number;
error_code = 0;//not used for live images
lock.unlock();
}
void ReceiverTask::GetLiveImageDataDigital(shared_ptr<int16> const& img_data,
int32& frame_number,
int16& error_code)
{
......@@ -119,27 +139,34 @@ namespace FSDataRecvNS
LOG_INFOS("Get requested images : "+to_string(m_requested_frames));
m_received_imgs = 0;
m_packets_collected = 0;
m_processing_frame_nr = -1;
m_current_frame_number = 0;
m_processed_frames_in_one_train = 0;
}
lock.unlock();
if(m_mempool16bit->GetTotalReceivedFrames() == m_requested_frames)
{
LOG_INFOS("Get enough images stop : "+to_string(m_requested_frames));
Stop();
continue;
}
//error_code = m_udpconn->ReceivePacket((char*)(&m_packet),packet_size);
while(m_udpconn->ReceivePacket((char*)(&m_packet),packet_size) != -1)
{
// if(packet_size != 8192)
// LOG_STREAM(__FUNCTION__,ERROR,"packet size error : " + to_string(packet_size));
if(m_packets_collected == 0 && IsTrainFormat(m_packet))
if(m_packets_collected == 0)
m_train_format = IsTrainFormat(m_packet);
if(m_train_format)
BuildTrainImage(m_packet,packet_size);
else
BuildImage(m_packet);
if(m_mempool16bit->GetTotalReceivedFrames() == m_requested_frames)
{
LOG_INFOS("Get enough images stop : "+to_string(m_requested_frames));
Stop();
break;
}
}
}
}
......@@ -149,20 +176,14 @@ namespace FSDataRecvNS
LOG_TRACE(__FUNCTION__);
string header = string(packet_data.data,8);
if(header == TRAIN_HEADER_CHECK)
if(header == TRAIN_HEADER_CHECK)
{
LOG_INFOS("train format");
return true;
}
LOG_INFOS("not train format");
return false;
}
void ReceiverTask::BuildTrainImage(packet_struct const& packet_data,szt packet_size)
{
bool sof = ((((uchar)packet_data.data[packet_size-1]) & 0x80) >> 7) & 0x1;
bool eof = ((((uchar)packet_data.data[packet_size-1]) & 0x40) >> 6) & 0x1;
m_current_frame_number = (((uint64)packet_data.data[47]& 0xff)<<56)
int32 major_version = -1;
int32 minor_version = -1;
memcpy(&major_version,packet_data.data+8,sizeof(int32));
memcpy(&minor_version,packet_data.data+12,sizeof(int32));
m_frames_in_one_train = (((uint64)packet_data.data[47]& 0xff)<<56)
+(((uint64)packet_data.data[46]& 0xff)<<48)
+(((uint64)packet_data.data[45]& 0xff)<<40)
+(((uint64)packet_data.data[44]& 0xff)<<32)
......@@ -171,8 +192,47 @@ namespace FSDataRecvNS
+(((uint64)packet_data.data[41]& 0xff)<<8)
+((uint64)packet_data.data[40]& 0xff);
LOG_INFOS("train format,version:"
+ to_string(major_version) + "." + to_string(minor_version)
+"frames in one train:" + to_string(m_frames_in_one_train));
return true;
}
else
{
LOG_INFOS("This is not train format");
return false;
}
}
void ReceiverTask::BuildTrainImage(packet_struct const& packet_data,szt packet_size)
{
int bytes_to_copy = 0;
if(sof)
int32 frame_nr = (((int32)packet_data.data[packet_size-5])<<24)
+(((int32)packet_data.data[packet_size-6])<<16)
+(((int32)packet_data.data[packet_size-7])<<8)
+((int32)packet_data.data[packet_size-8]);
LOG_STREAM(__FUNCTION__,TRACE,"current frame nr:" + to_string(frame_nr));
if(m_processing_frame_nr == -1)
m_processing_frame_nr = frame_nr;
if(m_processing_frame_nr != frame_nr)
{
LOG_STREAM(__FUNCTION__,ERROR,"train broken,restart process. Processing frame nr: " + to_string(m_processing_frame_nr) + " - frame nr:" + to_string(frame_nr));
short error_code = 1;
++m_received_imgs;
m_mempool16bit->SetImage(m_img_data.data(),m_received_imgs,error_code);
m_packets_collected = 0;
m_collected_bytes = 0;
m_packets_collected = 0;
m_processed_frames_in_one_train = 0;
//m_received_imgs = 0;
}
if(m_packets_collected == 0)
{
bytes_to_copy = packet_size
- 64 //train header size
......@@ -183,36 +243,99 @@ namespace FSDataRecvNS
bytes_to_copy);
m_collected_bytes += bytes_to_copy;
m_packets_collected++;
LOG_INFOS("first packet processed, frame nr:" + to_string(frame_nr));
}
else
{
m_packets_collected++;
int remain_bytes = AGIPD_IMAGE_SIZE_IN_BYTES - m_collected_bytes;
bytes_to_copy = packet_size
- 8; // trailer size,no header
memcpy(&m_img_data[m_collected_bytes/2],
&(packet_data.data[0]),
bytes_to_copy);
m_collected_bytes += bytes_to_copy;
if(eof)
if(remain_bytes <= bytes_to_copy)
{
LOG_INFOS("last packet, copy image into buffer" + to_string(m_current_frame_number));
LOG_INFOS("received packets:" + to_string(m_packets_collected));
++m_received_imgs;
memcpy(&m_img_data[m_collected_bytes/2],
&(packet_data.data[0]),
remain_bytes);
short error_code = 0;
++m_received_imgs;
++m_processed_frames_in_one_train;
m_collected_bytes += remain_bytes;
if(m_collected_bytes != 131072) // expected image size is 131072
LOG_STREAM(__FUNCTION__,ERROR,"collected bytes error" + to_string(m_collected_bytes)) ;
int32 col = 0;
int32 row = 0;
for(int32 i=0; i<m_img_size; ++i)
{
m_reordered_img[(col+m_x*row)]=m_img_data[i];
++row;
if(row == m_y)
{
row = 0;
++col;
}
}
m_mempool16bit->SetImage(m_reordered_img.data(),m_received_imgs,error_code);
m_mempool16bit->SetImage(m_img_data.data(),m_current_frame_number,error_code);
}
}
++m_packets_collected;
// update digital image
if(m_received_imgs % m_liveviewinterval == 0 && m_liveviewinterval != -1)
{
boost::unique_lock<boost::mutex> lock(m_bstSync);
std::copy(m_reordered_img.begin(),m_reordered_img.end(),m_liveimg.begin());
lock.unlock();
}
// update analog image
if(m_received_imgs % (m_liveviewinterval+1) == 0 && m_liveviewinterval != -1)
{
boost::unique_lock<boost::mutex> lock(m_bstSync);
std::copy(m_reordered_img.begin(),m_reordered_img.end(),m_liveimg_analog.begin());
lock.unlock();
}
LOG_INFOS("one image is finished,received images:"
+ to_string(m_received_imgs)
+ "package collected:"
+ to_string(m_packets_collected)
+ "bytes colltected:"
+ to_string(m_collected_bytes));
m_collected_bytes = 0;
if(m_processed_frames_in_one_train == m_frames_in_one_train*2)
{
LOG_INFOS("all images in this train are saved");
++m_processing_frame_nr;
m_collected_bytes = 0;
m_packets_collected = 0;
m_processed_frames_in_one_train = 0;
}
else
{
int new_image_bytes = bytes_to_copy-remain_bytes;
if(new_image_bytes != 0)
{
LOG_INFOS("copy remain:"+to_string(new_image_bytes)+" to new image");
memcpy(&m_img_data[m_collected_bytes/2],
&(packet_data.data[0]) + remain_bytes,
new_image_bytes);
m_collected_bytes += new_image_bytes;
}
}
}
else
{
memcpy(&m_img_data[m_collected_bytes/2],
&(packet_data.data[0]),
bytes_to_copy);
m_collected_bytes += bytes_to_copy;
LOG_STREAM(__FUNCTION__,TRACE,"copy bytes:" + to_string(bytes_to_copy));
}
}
}
void ReceiverTask::BuildImage(packet_struct const& packet_data)
{
......
......@@ -94,7 +94,17 @@ namespace FSDataRecvNS
* @param frame_number frame No.
* @param error_code
*/
void GetLiveImageData(shared_ptr<int32> const& img_data,
void GetLiveImageDataAnalog(shared_ptr<int16> const& img_data,
int32& frame_number,
int16& error_code);
/**
* @brief get live image
* @param img_data image data
* @param frame_number frame No.
* @param error_code
*/
void GetLiveImageDataDigital(shared_ptr<int16> const& img_data,
int32& frame_number,
int16& error_code);
......@@ -126,15 +136,23 @@ namespace FSDataRecvNS
*/
void BuildImage(packet_struct const& packet_data);
sptr_mem16bit m_mempool16bit;
uptr_network m_udpconn;
int64 m_current_frame_number;
uint64 m_current_frame_number,m_received_imgs,m_processed_frames_in_one_train,m_frames_in_one_train;
int32 m_img_size,m_received_imgs,m_requested_frames,m_packets_collected,m_x,m_y,m_liveviewinterval,m_collected_bytes;
int32 m_img_size,
m_requested_frames,
m_packets_collected,
m_x,
m_y,
m_liveviewinterval,
m_collected_bytes,
m_processing_frame_nr;
packet_struct m_packet;
vector<int16> m_img_data,m_reordered_img,m_liveimg;
vector<int16> m_img_data,m_reordered_img,m_liveimg,m_liveimg_analog;
bool m_train_format;
};
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment