1010#include " dfmodules/CommonIssues.hpp"
1111#include " dfmodules/opmon/DataWriter.pb.h"
1212
13- #include " confmodel/Application.hpp"
14- #include " confmodel/Session.hpp"
13+ #include " appmodel/DataStoreConf.hpp"
1514#include " appmodel/DataWriterModule.hpp"
1615#include " appmodel/TRBModule.hpp"
17- #include " appmodel/DataStoreConf .hpp"
16+ #include " confmodel/Application .hpp"
1817#include " confmodel/Connection.hpp"
18+ #include " confmodel/Session.hpp"
1919#include " daqdataformats/Fragment.hpp"
2020#include " dfmessages/TriggerDecision.hpp"
2121#include " dfmessages/TriggerRecord_serialization.hpp"
22- #include " logging/Logging.hpp"
2322#include " iomanager/IOManager.hpp"
23+ #include " logging/Logging.hpp"
2424#include " rcif/cmd/Nljs.hpp"
2525
2626#include < algorithm>
3434/* *
3535 * @brief Name used by TRACE TLOG calls from this source file
3636 */
37- // #define TRACE_NAME "DataWriterModule" // NOLINT This is the default
37+ // #define TRACE_NAME "DataWriterModule" // NOLINT This is the default
3838enum
3939{
4040 TLVL_ENTER_EXIT_METHODS = 5 ,
@@ -77,18 +77,18 @@ DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
7777 }
7878 if (outputs.size () != 1 ) {
7979 throw appfwk::CommandFailed (
80- ERS_HERE, " init" , get_name (), " Expected 1 output, got " + std::to_string (outputs.size ()));
80+ ERS_HERE, " init" , get_name (), " Expected 1 output, got " + std::to_string (outputs.size ()));
8181 }
8282
8383 m_module_configuration = mcfg;
8484 m_data_writer_conf = mdal->get_configuration ();
8585 m_writer_identifier = mdal->get_writer_identifier ();
8686
8787 if (inputs[0 ]->get_data_type () != datatype_to_string<std::unique_ptr<daqdataformats::TriggerRecord>>()) {
88- throw InvalidQueueFatalError (ERS_HERE, get_name (), " TriggerRecord Input queue" );
88+ throw InvalidQueueFatalError (ERS_HERE, get_name (), " TriggerRecord Input queue" );
8989 }
9090 if (outputs[0 ]->get_data_type () != datatype_to_string<dfmessages::TriggerDecisionToken>()) {
91- throw InvalidQueueFatalError (ERS_HERE, get_name (), " TriggerDecisionToken Output queue" );
91+ throw InvalidQueueFatalError (ERS_HERE, get_name (), " TriggerDecisionToken Output queue" );
9292 }
9393
9494 m_trigger_record_connection = inputs[0 ]->UID ();
@@ -102,7 +102,7 @@ DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
102102 break ;
103103 }
104104 if (mod->class_name () == " TRMonRequestorModule" ) {
105- is_trmon = true ;
105+ is_trmon = true ;
106106 break ;
107107 }
108108 }
@@ -120,37 +120,39 @@ DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
120120 }
121121
122122 // try to create the receiver to see test the connection anyway
123- m_tr_receiver = iom -> get_receiver<std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);
123+ m_tr_receiver = iom-> get_receiver <std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);
124124
125125 m_token_output = iom->get_sender <dfmessages::TriggerDecisionToken>(outputs[0 ]->UID ());
126-
126+
127127 TLOG_DEBUG (TLVL_ENTER_EXIT_METHODS) << get_name () << " : Exiting init() method" ;
128128}
129129
130130void
131- DataWriterModule::generate_opmon_data () {
131+ DataWriterModule::generate_opmon_data ()
132+ {
132133
133134 opmon::DataWriterInfo dwi;
134135
135136 dwi.set_records_received (m_records_received_tot.load ());
136- // dwi.new_records_received = m_records_received.exchange(0);
137+ // dwi.new_records_received = m_records_received.exchange(0);
137138 dwi.set_records_written (m_records_written_tot.load ());
138139 dwi.set_new_records_written (m_records_written.exchange (0 ));
139- // dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
140- // dwi.new_bytes_output = m_bytes_output.exchange(0);
140+ // dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
141+ // dwi.new_bytes_output = m_bytes_output.exchange(0);
141142 dwi.set_writing_time_us (m_writing_us.exchange (0 ));
142143
143144 publish (std::move (dwi));
144145}
145-
146+
146147void
147148DataWriterModule::do_conf (const CommandData_t&)
148149{
149150 TLOG_DEBUG (TLVL_ENTER_EXIT_METHODS) << get_name () << " : Entering do_conf() method" ;
150151
151152 m_data_storage_prescale = m_data_writer_conf->get_data_storage_prescale ();
152153 TLOG_DEBUG (TLVL_CONFIG) << get_name () << " : data_storage_prescale is " << m_data_storage_prescale;
153- TLOG_DEBUG (TLVL_CONFIG) << get_name () << " : data_store_parameters are " << m_data_writer_conf->get_data_store_params ();
154+ TLOG_DEBUG (TLVL_CONFIG) << get_name () << " : data_store_parameters are "
155+ << m_data_writer_conf->get_data_store_params ();
154156 m_min_write_retry_time_usec = m_data_writer_conf->get_min_write_retry_time_ms () * 1000 ;
155157 if (m_min_write_retry_time_usec < 1 ) {
156158 m_min_write_retry_time_usec = 1 ;
@@ -162,7 +164,8 @@ DataWriterModule::do_conf(const CommandData_t&)
162164 try {
163165 m_data_writer = make_data_store (m_data_writer_conf->get_data_store_params ()->get_type (),
164166 m_data_writer_conf->get_data_store_params ()->UID (),
165- m_module_configuration, m_writer_identifier);
167+ m_module_configuration,
168+ m_writer_identifier);
166169 register_node (" data_writer" , m_data_writer);
167170 } catch (const ers::Issue& excpt) {
168171 throw UnableToConfigure (ERS_HERE, get_name (), excpt);
180183DataWriterModule::do_start (const CommandData_t& payload)
181184{
182185 TLOG_DEBUG (TLVL_ENTER_EXIT_METHODS) << get_name () << " : Entering do_start() method" ;
183-
186+
184187 rcif::cmd::StartParams start_params = payload.get <rcif::cmd::StartParams>();
185188 m_data_storage_is_enabled = (!start_params.disable_data_storage );
186189 m_run_number = start_params.run ;
@@ -204,7 +207,7 @@ DataWriterModule::do_start(const CommandData_t& payload)
204207 std::this_thread::sleep_for (std::chrono::microseconds (5000 ));
205208 }
206209 } while (wasSentSuccessfully);
207-
210+
208211 // 04-Feb-2021, KAB: added this call to allow DataStore to prepare for the run.
209212 // I've put this call fairly early in this method because it could throw an
210213 // exception and abort the run start. And, it seems sensible to avoid starting
@@ -217,7 +220,7 @@ DataWriterModule::do_start(const CommandData_t& payload)
217220 // in case the "start" has been called before the "conf"
218221 ers::fatal (InvalidDataWriterModule (ERS_HERE, get_name ()));
219222 }
220-
223+
221224 try {
222225 m_data_writer->prepare_for_run (m_run_number, (start_params.production_vs_test == " TEST" ));
223226 } catch (const ers::Issue& excpt) {
@@ -226,7 +229,7 @@ DataWriterModule::do_start(const CommandData_t& payload)
226229 }
227230
228231 m_seqno_counts.clear ();
229-
232+
230233 m_records_received = 0 ;
231234 m_records_received_tot = 0 ;
232235 m_records_written = 0 ;
@@ -237,8 +240,8 @@ DataWriterModule::do_start(const CommandData_t& payload)
237240 m_running.store (true );
238241
239242 m_thread.start_working_thread (get_name ());
240- // iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection,
241- // bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );
243+ // iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
244+ // m_trigger_record_connection, bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );
242245
243246 TLOG () << get_name () << " successfully started for run number " << m_run_number;
244247 TLOG_DEBUG (TLVL_ENTER_EXIT_METHODS) << get_name () << " : Exiting do_start() method" ;
@@ -250,8 +253,9 @@ DataWriterModule::do_stop(const CommandData_t& /*args*/)
250253 TLOG_DEBUG (TLVL_ENTER_EXIT_METHODS) << get_name () << " : Entering do_stop() method" ;
251254
252255 m_running.store (false );
253- m_thread.stop_working_thread ();
254- // iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection );
256+ m_thread.stop_working_thread ();
257+ // iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
258+ // m_trigger_record_connection );
255259
256260 // 04-Feb-2021, KAB: added this call to allow DataStore to finish up with this run.
257261 // I've put this call fairly late in this method so that any draining of queues
@@ -280,21 +284,24 @@ DataWriterModule::do_scrap(const CommandData_t& /*payload*/)
280284}
281285
282286void
283- DataWriterModule::receive_trigger_record (std::unique_ptr<daqdataformats::TriggerRecord> & trigger_record_ptr)
287+ DataWriterModule::receive_trigger_record (std::unique_ptr<daqdataformats::TriggerRecord>& trigger_record_ptr)
284288{
285289 TLOG_DEBUG (TLVL_ENTER_EXIT_METHODS) << get_name () << " : receiving a new TR ptr" ;
286290
287291 ++m_records_received;
288292 ++m_records_received_tot;
289293 TLOG_DEBUG (TLVL_WORK_STEPS) << get_name () << " : Obtained the TriggerRecord for trigger number "
290- << trigger_record_ptr->get_header_ref ().get_trigger_number () << " ."
291- << trigger_record_ptr->get_header_ref ().get_sequence_number ()
292- << " , run number " << trigger_record_ptr->get_header_ref ().get_run_number ()
293- << " off the input connection" ;
294+ << trigger_record_ptr->get_header_ref ().get_trigger_number () << " ."
295+ << trigger_record_ptr->get_header_ref ().get_sequence_number () << " , run number "
296+ << trigger_record_ptr->get_header_ref ().get_run_number () << " off the input connection" ;
294297
295298 if (trigger_record_ptr->get_header_ref ().get_run_number () != m_run_number) {
296- ers::error (InvalidRunNumber (ERS_HERE, get_name (), " TriggerRecord" , trigger_record_ptr->get_header_ref ().get_run_number (),
297- m_run_number, trigger_record_ptr->get_header_ref ().get_trigger_number (),
299+ ers::error (InvalidRunNumber (ERS_HERE,
300+ get_name (),
301+ " TriggerRecord" ,
302+ trigger_record_ptr->get_header_ref ().get_run_number (),
303+ m_run_number,
304+ trigger_record_ptr->get_header_ref ().get_trigger_number (),
298305 trigger_record_ptr->get_header_ref ().get_sequence_number ()));
299306 return ;
300307 }
@@ -304,50 +311,50 @@ DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::Trigger
304311 // instead of zero, since I think that it would be nice to always get the first event
305312 // written out.
306313 if (m_data_storage_prescale <= 1 || ((m_records_received_tot.load () % m_data_storage_prescale) == 1 )) {
307-
314+
308315 if (m_data_storage_is_enabled) {
309316
310317 std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now ();
311-
318+
312319 bool should_retry = true ;
313320 size_t retry_wait_usec = m_min_write_retry_time_usec;
314321 do {
315- should_retry = false ;
316- try {
317- m_data_writer->write (*trigger_record_ptr);
318- ++m_records_written;
319- ++m_records_written_tot;
320- m_bytes_output += trigger_record_ptr->get_total_size_bytes ();
321- m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes ();
322- } catch (const RetryableDataStoreProblem& excpt) {
323- should_retry = true ;
324- ers::error (DataWritingProblem (ERS_HERE,
325- get_name (),
326- trigger_record_ptr->get_header_ref ().get_trigger_number (),
327- trigger_record_ptr->get_header_ref ().get_sequence_number (),
328- trigger_record_ptr->get_header_ref ().get_run_number (),
329- excpt));
330- if (retry_wait_usec > m_max_write_retry_time_usec) {
331- retry_wait_usec = m_max_write_retry_time_usec;
332- }
333- usleep (retry_wait_usec);
334- retry_wait_usec *= m_write_retry_time_increase_factor;
335- } catch (const std::exception& excpt) {
336- ers::error (DataWritingProblem (ERS_HERE,
337- get_name (),
338- trigger_record_ptr->get_header_ref ().get_trigger_number (),
339- trigger_record_ptr->get_header_ref ().get_sequence_number (),
340- trigger_record_ptr->get_header_ref ().get_run_number (),
341- excpt));
342- }
322+ should_retry = false ;
323+ try {
324+ m_data_writer->write (*trigger_record_ptr);
325+ ++m_records_written;
326+ ++m_records_written_tot;
327+ m_bytes_output += trigger_record_ptr->get_total_size_bytes ();
328+ m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes ();
329+ } catch (const RetryableDataStoreProblem& excpt) {
330+ should_retry = true ;
331+ ers::error (DataWritingProblem (ERS_HERE,
332+ get_name (),
333+ trigger_record_ptr->get_header_ref ().get_trigger_number (),
334+ trigger_record_ptr->get_header_ref ().get_sequence_number (),
335+ trigger_record_ptr->get_header_ref ().get_run_number (),
336+ excpt));
337+ if (retry_wait_usec > m_max_write_retry_time_usec) {
338+ retry_wait_usec = m_max_write_retry_time_usec;
339+ }
340+ usleep (retry_wait_usec);
341+ retry_wait_usec *= m_write_retry_time_increase_factor;
342+ } catch (const std::exception& excpt) {
343+ ers::error (DataWritingProblem (ERS_HERE,
344+ get_name (),
345+ trigger_record_ptr->get_header_ref ().get_trigger_number (),
346+ trigger_record_ptr->get_header_ref ().get_sequence_number (),
347+ trigger_record_ptr->get_header_ref ().get_run_number (),
348+ excpt));
349+ }
343350 } while (should_retry && m_running.load ());
344351
345352 std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now ();
346353 auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
347354 m_writing_us += writing_time.count ();
348355 } // if m_data_storage_is_enabled
349356 }
350-
357+
351358 bool send_trigger_complete_message = m_running.load ();
352359 if (trigger_record_ptr->get_header_ref ().get_max_sequence_number () > 0 ) {
353360 daqdataformats::trigger_number_t trigno = trigger_record_ptr->get_header_ref ().get_trigger_number ();
@@ -362,50 +369,49 @@ DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::Trigger
362369 m_seqno_counts.erase (trigno);
363370 } else {
364371 // Using const .count and .at to avoid reintroducing element to map
365- TLOG_DEBUG (TLVL_SEQNO_MAP_CONTENTS) << get_name () << " : the sequence number count for trigger number " << trigno
366- << " is " << (m_seqno_counts.count (trigno) ? m_seqno_counts.at (trigno) : 0 ) << " (number of entries "
367- << " in the seqno map is " << m_seqno_counts.size () << " )." ;
372+ TLOG_DEBUG (TLVL_SEQNO_MAP_CONTENTS)
373+ << get_name () << " : the sequence number count for trigger number " << trigno << " is "
374+ << (m_seqno_counts.count (trigno) ? m_seqno_counts.at (trigno) : 0 ) << " (number of entries "
375+ << " in the seqno map is " << m_seqno_counts.size () << " )." ;
368376 send_trigger_complete_message = false ;
369377 }
370378 }
371379 if (send_trigger_complete_message) {
372380 TLOG_DEBUG (TLVL_WORK_STEPS) << get_name () << " : Pushing the TriggerDecisionToken for trigger number "
373- << trigger_record_ptr->get_header_ref ().get_trigger_number ()
374- << " onto the relevant output queue" ;
381+ << trigger_record_ptr->get_header_ref ().get_trigger_number ()
382+ << " onto the relevant output queue" ;
375383 dfmessages::TriggerDecisionToken token;
376384 token.run_number = m_run_number;
377385 token.trigger_number = trigger_record_ptr->get_header_ref ().get_trigger_number ();
378386 token.decision_destination = m_trigger_decision_connection;
379387
380388 bool wasSentSuccessfully = false ;
381- do {
389+ do {
382390 try {
383- m_token_output -> send ( std::move (token), m_queue_timeout );
384- wasSentSuccessfully = true ;
391+ m_token_output-> send (std::move (token), m_queue_timeout);
392+ wasSentSuccessfully = true ;
385393 } catch (const ers::Issue& excpt) {
386- std::ostringstream oss_warn;
387- oss_warn << " Send with sender \" " << m_token_output -> get_name () << " \" failed" ;
388- ers::warning (iomanager::OperationFailed (ERS_HERE, oss_warn.str (), excpt));
394+ std::ostringstream oss_warn;
395+ oss_warn << " Send with sender \" " << m_token_output-> get_name () << " \" failed" ;
396+ ers::warning (iomanager::OperationFailed (ERS_HERE, oss_warn.str (), excpt));
389397 }
390398 } while (!wasSentSuccessfully && m_running.load ());
391-
392399 }
393-
400+
394401 TLOG_DEBUG (TLVL_ENTER_EXIT_METHODS) << get_name () << " : operations completed for TR" ;
395402} // NOLINT(readability/fn_size)
396403
397404void
398- DataWriterModule::do_work (std::atomic<bool >& running_flag) {
405+ DataWriterModule::do_work (std::atomic<bool >& running_flag)
406+ {
399407 while (running_flag.load ()) {
400- try {
401- std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver-> receive (std::chrono::milliseconds (10 ));
402- receive_trigger_record (tr);
403- }
404- catch (const iomanager::TimeoutExpired& excpt) {
405- }
406- catch (const ers::Issue & excpt) {
407- ers::warning (excpt);
408- }
408+ try {
409+ std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver->receive (std::chrono::milliseconds (10 ));
410+ receive_trigger_record (tr);
411+ } catch (const iomanager::TimeoutExpired& excpt) {
412+ } catch (const ers::Issue& excpt) {
413+ ers::warning (excpt);
414+ }
409415 }
410416}
411417
0 commit comments