1010// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
1111// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
1212
13+ #include " perspective/raw_types.h"
1314#include < perspective/first.h>
1415#include < perspective/context_unit.h>
1516#include < perspective/context_zero.h>
@@ -48,7 +49,9 @@ calc_negate(t_tscalar val) {
4849 return val.negate ();
4950}
5051
51- t_gnode::t_gnode (t_schema input_schema, t_schema output_schema) :
52+ t_gnode::t_gnode (
53+ t_schema input_schema, t_schema output_schema, t_uindex limit
54+ ) :
5255 m_mode (NODE_PROCESSING_SIMPLE_DATAFLOW)
5356#ifdef PSP_PARALLEL_FOR
5457 ,
@@ -60,6 +63,7 @@ t_gnode::t_gnode(t_schema input_schema, t_schema output_schema) :
6063 m_output_schema (std::move(output_schema)),
6164 m_init (false ),
6265 m_id (0 ),
66+ m_limit (limit),
6367 m_last_input_port_id (0 ),
6468 m_pool_cleanup ([]() {}) {
6569 PSP_TRACE_SENTINEL ();
@@ -85,6 +89,10 @@ t_gnode::t_gnode(t_schema input_schema, t_schema output_schema) :
8589 existed_schema
8690 };
8791 m_epoch = std::chrono::high_resolution_clock::now ();
92+
93+ m_input_schema.add_column (
94+ " psp_old_pkey" , m_input_schema.get_dtype (" psp_pkey" )
95+ );
8896}
8997
9098t_gnode::~t_gnode () {
97105t_gnode::init () {
98106 PSP_TRACE_SENTINEL ();
99107
100- m_gstate = std::make_shared<t_gstate>(m_input_schema, m_output_schema);
108+ m_gstate =
109+ std::make_shared<t_gstate>(m_input_schema, m_output_schema, m_limit);
101110 m_gstate->init ();
102111
103112 // Create and store the main input port, which is always port 0. The next
@@ -123,7 +132,7 @@ t_gnode::init() {
123132
124133 for (const auto & iter : m_input_ports) {
125134 std::shared_ptr<t_port> input_port = iter.second ;
126- input_port->get_table ()->flatten ();
135+ input_port->get_table ()->flatten (m_limit );
127136 }
128137
129138 // Initialize expression-related state
@@ -186,11 +195,13 @@ t_gnode::calc_transition(
186195
187196 if (!row_pre_existed && !cur_valid && !t_env::backout_invalid_neq_ft ()) {
188197 trans = VALUE_TRANSITION_NEQ_FT;
189- } else if (row_pre_existed && !prev_valid && !cur_valid && !t_env::backout_eq_invalid_invalid ()) {
198+ } else if (row_pre_existed && !prev_valid && !cur_valid
199+ && !t_env::backout_eq_invalid_invalid ()) {
190200 trans = VALUE_TRANSITION_EQ_TT;
191201 } else if (!prev_existed && !exists) {
192202 trans = VALUE_TRANSITION_EQ_FF;
193- } else if (row_pre_existed && exists && !prev_valid && cur_valid && !t_env::backout_nveq_ft ()) {
203+ } else if (row_pre_existed && exists && !prev_valid && cur_valid
204+ && !t_env::backout_nveq_ft ()) {
194205 trans = VALUE_TRANSITION_NVEQ_FT;
195206 } else if (prev_existed && exists && prev_cur_eq) {
196207 trans = VALUE_TRANSITION_EQ_TT;
@@ -298,16 +309,22 @@ t_gnode::_process_table(t_uindex port_id) {
298309 }
299310
300311 m_was_updated = true ;
301- flattened = input_port->get_table ()->flatten ();
312+ flattened = input_port->get_table ()->flatten (m_limit );
302313
303314 PSP_GNODE_VERIFY_TABLE (flattened);
304315 PSP_GNODE_VERIFY_TABLE (get_table ());
305316
306317 t_uindex flattened_num_rows = flattened->num_rows ();
307-
308318 std::vector<t_rlookup> row_lookup (flattened_num_rows);
309319 t_column* pkey_col = flattened->get_column (" psp_pkey" ).get ();
310320
321+ #if PSP_DEBUG
322+ LOG_DEBUG (" m_mapping" );
323+ for (const auto [k, v] : m_gstate->get_pkey_map ()) {
324+ LOG_DEBUG (" KEY: " << k << " , VALUE: " << v);
325+ }
326+ #endif
327+
311328 for (t_uindex idx = 0 ; idx < flattened_num_rows; ++idx) {
312329 // See if each primary key in flattened already exist in the dataset
313330 t_tscalar pkey = pkey_col->get_scalar (idx);
0 commit comments