1010// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
1111// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
1212
13- #include " perspective/raw_types.h"
1413#include < perspective/first.h>
1514#include < perspective/context_unit.h>
1615#include < perspective/context_zero.h>
@@ -49,9 +48,7 @@ calc_negate(t_tscalar val) {
4948 return val.negate ();
5049}
5150
52- t_gnode::t_gnode (
53- t_schema input_schema, t_schema output_schema, t_uindex limit
54- ) :
51+ t_gnode::t_gnode (t_schema input_schema, t_schema output_schema) :
5552 m_mode (NODE_PROCESSING_SIMPLE_DATAFLOW)
5653#ifdef PSP_PARALLEL_FOR
5754 ,
@@ -63,7 +60,6 @@ t_gnode::t_gnode(
6360 m_output_schema (std::move(output_schema)),
6461 m_init (false ),
6562 m_id (0 ),
66- m_limit (limit),
6763 m_last_input_port_id (0 ),
6864 m_pool_cleanup ([]() {}) {
6965 PSP_TRACE_SENTINEL ();
@@ -89,10 +85,6 @@ t_gnode::t_gnode(
8985 existed_schema
9086 };
9187 m_epoch = std::chrono::high_resolution_clock::now ();
92-
93- m_input_schema.add_column (
94- " psp_old_pkey" , m_input_schema.get_dtype (" psp_pkey" )
95- );
9688}
9789
9890t_gnode::~t_gnode () {
10597t_gnode::init () {
10698 PSP_TRACE_SENTINEL ();
10799
108- m_gstate =
109- std::make_shared<t_gstate>(m_input_schema, m_output_schema, m_limit);
100+ m_gstate = std::make_shared<t_gstate>(m_input_schema, m_output_schema);
110101 m_gstate->init ();
111102
112103 // Create and store the main input port, which is always port 0. The next
@@ -132,7 +123,7 @@ t_gnode::init() {
132123
133124 for (const auto & iter : m_input_ports) {
134125 std::shared_ptr<t_port> input_port = iter.second ;
135- input_port->get_table ()->flatten (m_limit );
126+ input_port->get_table ()->flatten ();
136127 }
137128
138129 // Initialize expression-related state
@@ -307,22 +298,16 @@ t_gnode::_process_table(t_uindex port_id) {
307298 }
308299
309300 m_was_updated = true ;
310- flattened = input_port->get_table ()->flatten (m_limit );
301+ flattened = input_port->get_table ()->flatten ();
311302
312303 PSP_GNODE_VERIFY_TABLE (flattened);
313304 PSP_GNODE_VERIFY_TABLE (get_table ());
314305
315306 t_uindex flattened_num_rows = flattened->num_rows ();
307+
316308 std::vector<t_rlookup> row_lookup (flattened_num_rows);
317309 t_column* pkey_col = flattened->get_column (" psp_pkey" ).get ();
318310
319- #if PSP_DEBUG
320- LOG_DEBUG (" m_mapping" );
321- for (const auto [k, v] : m_gstate->get_pkey_map ()) {
322- LOG_DEBUG (" KEY: " << k << " , VALUE: " << v);
323- }
324- #endif
325-
326311 for (t_uindex idx = 0 ; idx < flattened_num_rows; ++idx) {
327312 // See if each primary key in flattened already exist in the dataset
328313 t_tscalar pkey = pkey_col->get_scalar (idx);
0 commit comments