From bfa34ab1e9f454b81d0f1601cee305f25f67b54b Mon Sep 17 00:00:00 2001 From: vlad shkapenyuk Date: Fri, 18 Oct 2019 14:52:36 -0400 Subject: [PATCH] Add advancing timestamp_ts based on system time to unblock merged streams Signed-off-by: vlad shkapenyuk Change-Id: I2599b89687f9459f5af6a9fbe082f14ddf691b68 --- src/ftacmp/generate_lfta_code.cc | 42 +++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index 06bfc9b..98d872d 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -1842,30 +1842,36 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co ret += "\t#endif\n"; // we need to pay special attention to time fields - if (field == "time" || field == "timestamp"){ - ret += "\tcur_time = time(&cur_time);\n"; + if (field == "time" || field == "timestamp" || field == "timestamp_ms"){ + ret += "\tcur_time = time(&cur_time);\n"; - if (field == "time") { - sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n", + if (field == "time") { + sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n", tblref, time_corr); - ret += tmpstr; - sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n", - field.c_str(), tblref, field.c_str(), tblref, time_corr); - } else { - sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n", + ret += tmpstr; + sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n", + field.c_str(), tblref, field.c_str(), tblref, time_corr); + } else if (field == "timestamp_ms") { + sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n", + tblref, time_corr); + ret += tmpstr; + sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n", + field.c_str(), tblref, field.c_str(), tblref, time_corr); + }else{ + sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n", field.c_str(), tblref, time_corr); - ret += tmpstr; - sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n", + ret += tmpstr; + sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n", field.c_str(), tblref, field.c_str(), tblref, time_corr); - } - ret += tmpstr; + } + ret += tmpstr; - ret += "\t\ttime_advanced = 1;\n"; - ret += "\t}\n"; + ret += "\t\ttime_advanced = 1;\n"; + ret += "\t}\n"; - sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n", - field.c_str(), tblref, field.c_str(), tblref); - ret += tmpstr; + sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n", + field.c_str(), tblref, field.c_str(), tblref); + ret += tmpstr; } else { sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref); -- 2.16.6