import datetime from functools import reduce import json from operator import getitem import sys import os import itertools # /home/pakin/Codes/coffCfg/cofffeemachineConfig/coffeethai02_1550.json def GetDateTimeString(): now = datetime.datetime.now() # current date and time date_time = now.strftime("%d-%b-%Y, %H:%M:%S") return date_time HomeDirectory = "/home/pakin" CoffeeRecipeDirectory = "/home/pakin/Codes/coffCfg/cofffeemachineConfig" valid_len_product_code = len("12-01-02-0001") # events - save any action done by merge events = [] events_json = [] # last_change - current value of "LastChange" last_change = "" # last_change_path - key mapping to "LastChange" key, joined by '.' and could be splitted to access value in nested map last_change_path = "" # master_json - main dictionary(map) for another json to merge into. master_json = {} # config_ver - target main dictionary's "configNumber" global config_ver config_ver = -1 # holdonPD = the current product code that has been holding holdonPD = "" # output config global isJson isJson = False # Set value of nested map (target - master_json) by using keys from 'key_list' and assigned value to it. def set_value_in_nested_map(key_list: list, value): reduce(getitem, key_list[:-1], master_json)[key_list[-1]] = value def get_value_in_nested_map(target_json: dict, key_list: list, isMaster=False): if "Recipe01" in key_list: if isMaster: key_list[1] += shared_master_position if "SubMenu" in key_list: if len(reduce(getitem, key_list[:3], target_json)) <= 0: return "-" return reduce(getitem, key_list[:-1], target_json)[key_list[-1]] # Decode key that was in form of String to the list def decode_path(str_with_dot: str) -> list: path_map = str_with_dot.split('.') keylist = [] for keyi in path_map: if keyi.isdigit(): keyi = int(keyi) elif keyi[1:].isdigit(): pass keylist.append(keyi) return keylist def fetch_pd(str_path: str, target_dict: dict) -> str: keyList = decode_path(str_with_dot=str_path) keyList.append("productCode") # print("decode and append get : ",keyList) return get_value_in_nested_map(target_json=target_dict, key_list=keyList) def fetch_matId(str_path: str, target_dict:dict) -> str: keyList = decode_path(str_with_dot=str_path) keyList.append("materialPathId") # print(keyList) return get_value_in_nested_map(target_json=target_dict, key_list=keyList) def fetch_defaultId(str_path: str, target_dict:dict) -> str: keyList = decode_path(str_with_dot=str_path) keyList.append("defaultIDSelect") # print(keyList) return get_value_in_nested_map(target_json=target_dict, key_list=keyList) def fetch_onlyMainMenuPath(str_with_dot: str): mainpath = decode_path(str_with_dot)[:2] # '.'.join(mainpath) return ".".join(str(p) for p in mainpath) def create_map(events_action: str, log: str, additional=[]) -> str: return { "timestamp": GetDateTimeString(), "event": events_action, "log": log, "addition": additional } # Merge diff value or append it to the main file. # 0 - master_path = master.json / to be merged # 1 - dev_path = dev.json / to merge into another # 2 - outfile = output # 3 - changefile = track log # 4 - debug = enable debug def merge(args): print("ARGS: => ",args) master_path = args[0]; dev_path = args[1]; outfile_path = args[2]; changefile_path = args[3]; file_path, out_ext = os.path.splitext(changefile_path) if "json" in out_ext: global isJson isJson = True if len(args) > 4: global debug debug = args[4] if args[4] != None else False # if (os.path.exists(master_path) == False) and str(master_path).isdigit(): master_path = CoffeeRecipeDirectory + "/coffeethai02_" + str(master_path) + ".json" master_file = open(master_path, 'rb') if (os.path.exists(dev_path) == False) and str(dev_path).isdigit(): dev_path = CoffeeRecipeDirectory + "/coffeethai02_" + str(dev_path) + ".json" dev_file = open(dev_path, 'rb') masterName = master_file.name; devName = dev_file.name master_file = master_file.raw.readall() dev_file = dev_file.raw.readall() print("Master file size => ",os.stat(master_path).st_size) print("Dev file size => ",os.stat(dev_path).st_size) # events_json.append(create_map(events_action="MERGE", log=devName.split("/")[-1]+" into "+masterName.split("/")[-1])) events.append(GetDateTimeString()+"\t[MERGE]\t\tMerging "+devName.split("/")[-1]+" into "+masterName.split("/")[-1]+"\n") # print(GetDateTimeString()+"\tMerging "+dev_file.name.split("/")[-1]+" into "+master_file.name.split("/")[-1]+"\n") print(events[len(events) - 1]) # print("Test maintain => ", MaintianUnicodeEscapeDecoder.decoder(s=master_file)) global master_json global dev_json master_json_file:dict = json.loads(master_file) master_json = master_json_file.copy() dev_json_file:dict = json.loads(dev_file) dev_json = dev_json_file.copy() config_ver = master_json["MachineSetting"]["configNumber"] global pdchange pdchange = 0 global pdadd pdadd = 0 # global holdonPD holdonPD = "" merge_dicts(master_json, dev_json_file) # print(master_json["MachineSetting"]["configNumber"]) try: if debug == "debug": print("/".join(changefile_path.split("/")[:-1])) except: pass if (os.path.exists("/".join(outfile_path.split("/")[:-1])) == False): os.makedirs("/".join(outfile_path.split("/")[:-1])) if (os.path.exists("/".join(changefile_path.split("/")[:-1])) == False): os.makedirs("/".join(changefile_path.split("/")[:-1])) with open(outfile_path, "w+", encoding="utf-8") as outfile: json.dump(master_json, outfile, indent=2, ensure_ascii=False) # Include counts events_json.append(create_map(events_action="COUNT", log="Total="+str(pdchange))) events_json.append(create_map(events_action="COUNT", log="Total="+str(pdadd))) events_json.append(create_map(events_action="OUTPUT", log="Finished! write output to "+outfile_path)) events_json.append(create_map(events_action="LOG", log="Log is saved to "+changefile_path)) events.append(GetDateTimeString()+"\t[COUNT]\t\tTotal Change: "+str(pdchange)+"\n") events.append(GetDateTimeString()+"\t[COUNT]\t\tTotal Insert: "+str(pdadd)+"\n") events.append(GetDateTimeString()+"\t[OUTPUT]\t\tFinished! write output to "+outfile_path+"\n") events.append(GetDateTimeString()+"\t[LOG]\t\tLog is saved to "+changefile_path+"\n") # log json file if os.stat(file_path+".json").st_size == 0: with open(file_path+".json", "w+",encoding="utf-8") as outlogfile: json.dump({"logs"+GetDateTimeString()+"*": events_json}, outlogfile, indent=2, ensure_ascii=False) else: print(file_path+".json") logjs:dict = json.loads(open(file_path+".json", encoding="utf-8").read()) logjs["logs"+GetDateTimeString()+"*"] = events_json json.dump(logjs, open(file_path+".json", "w+", encoding="utf-8"), indent=2, ensure_ascii=False) # log file with open(file_path+".log", "a+") as outlogfile2: try: for event in events: outlogfile2.write(event) except: raise Exception(event) # Create html version with open(file_path+".html", "a+") as outlogHtml: for event in events: # Create div # print("Log as list: ",str(event).split("\t")) html_string = "\t
\n" event_fraction = str(event).split("\t") for i in event_fraction: if i != "" and i != "\n" and i != "---": if "|" in i and not i.endswith("|"): # CHANGE spl_text = i.split("|") html_string += "\t\t

"+spl_text[0]+"

\n" html_string += "\t\t

"+spl_text[1].replace("\n","")+"

\n" elif ">>>" in i: # INSERT spl_text = i.split(">>>") html_string += "\t\t

"+spl_text[0]+"

\n" html_string += "\t\t

"+spl_text[1].replace("\n","")+"

\n" elif i.endswith("|"): html_string += "\t\t

"+i[:-1]+"

\n" else: # print("Default = ", i) # Either version, status or others html_string += "\t\t

"+i.replace("\n","")+"

\n" html_string += "\t
\n" outlogHtml.write(html_string) # Merge dictionary - called by `merge`, using when the value is `dict` type # original - main file to merge/append value into # updated - source of new value (must be in the same structure) # path - default = "". This variable is used for saving keys as path for accessing nested map def merge_dicts(original:dict, updated:dict, path=""): for key, value in updated.items(): current_path = f"{path}.{key}" if path else key if key in original: # if key == "Timestamp": global pre_timestamp pre_timestamp = value # change timestamp # original["Timestamp"] = GetDateTimeString() cfgnum = "" # events.append(GetDateTimeString()+"\t[TIMESTMP]\tLast Generated: "+value+cfgnum+"\tNew Generated at "+original["Timestamp"]+" \n") # print(events[len(events) - 1]) else: if key == "LastChange": global last_change global last_change_path last_change = value last_change_path = current_path # print("[LastChange] LastChange: ",last_change_path, " value = ",value) if isinstance(value, dict) and isinstance(original[key], dict): merge_dicts(original[key], value, current_path) elif isinstance(value, list) and isinstance(original[key], list): merge_lists(original[key], value, current_path) else: # Detect diff if original[key] != value: if key == "configNumber": # use master version global config_ver config_ver = master_json["MachineSetting"]["configNumber"] # original[key] = config_ver cfgnum = "\t[VER."+str(config_ver)+"]" if config_ver != -1 else "\t[...]" try: if debug == "debug": print("Config number ", config_ver) except: pass events_json.append( create_map(events_action="MERGE_TARGETS", log="Found `configNumber` => master version "+str(config_ver)+", merged with "+str(value))) events.append(GetDateTimeString()+cfgnum+"\tFound `configNumber` => master version "+str(config_ver)+", merged with "+str(value)+" \n") else: lc = last_change if last_change != "" else pre_timestamp try: if debug == "debug": print("Encounter path --> "+current_path, " | master: ",original[key]," dev: ", value) except: pass if "Recipe01" in current_path and not "recipes" in current_path: global holdonPD global pdchange if "productCode" in original and holdonPD != original["productCode"]: holdonPD = original["productCode"] pdchange += 1 events_json.append(create_map( events_action="CHANGE", log="", additional=[ { "version": config_ver, "master_last_change": (get_value_in_nested_map(master_json, decode_path(last_change_path), isMaster=True)), "dev_last_change": last_change, "pd": holdonPD, "name": original["name"], "fullpath": current_path } ] )) events.append(GetDateTimeString()+"\t[VER."+str(config_ver)+"] [CHANGE]\t"+" LastChange: "+(get_value_in_nested_map(master_json, decode_path(last_change_path), isMaster=True))+" (master) |\t"+last_change+" (dev)\t"+"---\t \""+holdonPD+"\"\n") # elif "MaterialSetting" in current_path: # events.append(GetDateTimeString()+"\t[VER."+str(config_ver)+"] [CHANGE]\t"+"\t MaterialSetting"+original["productCode"]+" --- "+" \""+str(value)+"\"("+original[key]+" // key:"+key+")\n") # override original value by value from updated(dev) original[key] = value else: events_json.append(create_map( events_action="INSERT+", log="NewKeyValue", additional=[{ "version": config_ver, "pd": holdonPD, "new_value": value, "fullpath": current_path }] )) events.append(GetDateTimeString()+"\t[VER."+str(config_ver)+"]\t\t\t[INSERT]\t"+"\tNew key & value >>> \""+holdonPD+" \""+str(value)+("\t\t\t")+"\n") try: if debug == "debug": print("Add path --> "+path, " | master: "," dev: ", str(value)) except: pass global pdadd pdadd += 1 original[key] = value # Merge list - called by `merge_dict`, using when the value is `dict` type # original - main file to merge/append value into # updated - source of new value (must be in the same structure) # path - default = "". This variable is used for saving keys as path for accessing nested map # # Update v2: Fix bug where index matched but replacing unrelated # Update v2.1: Fix unrelated & new log format def merge_lists(original, updated, path=""): for i, item in enumerate(updated): current_path = f"{path}.{i}" if isinstance(item, dict): if i < len(original) - 1 and isinstance(original[i], dict): # events.append("Merge dictionary: "+current_path) if path == "Recipe01": j = 0 if "productCode" not in original[i].keys(): key = "name" else: key = "productCode" while original[j][key] != item["productCode"] and j < len(original) - 1: j += 1 # override index; share index to other functions global shared_master_position shared_master_position = j - i # print("Found target index: ", j," (master) ",i," (dev) " ," check matched? ", original[j][key] == item[key], " use key: ", key, " path: ", current_path ) merge_dicts(original[j], item, current_path) else: merge_dicts(original[i], item, current_path) elif item not in original: try: if debug == "debug": print("Append dict@ i=",i, " path: ", current_path) except: pass events_json.append(create_map( events_action="INSERT", log="AppendDict", additional=[{ "version": config_ver, "pd": fetch_pd(fetch_onlyMainMenuPath(current_path), master_json), "fullpath": current_path }] )) events.append(GetDateTimeString()+"\t[VER."+str(config_ver)+"]\t\t\t[INSERT]\t"+"\tNew value >>> "+fetch_pd(fetch_onlyMainMenuPath(current_path), master_json)+("\t\t\t")+"\n") global pdadd pdadd += 1 original.append(item) elif item not in original: try: if debug == "debug": print("Append list@ i=",i, " path: ", current_path) except: pass events_json.append(create_map( events_action="INSERT", log="AppendList", additional=[{ "version": config_ver, "pd": fetch_pd(fetch_onlyMainMenuPath(current_path), master_json), "fullpath": current_path }] )) events.append(GetDateTimeString()+"\t[VER."+str(config_ver)+"]\t\t\t[INSERT]\t"+"\tNew value >>> "+fetch_pd(fetch_onlyMainMenuPath(current_path), master_json)+("\t\t\t")+"\n") pdadd += 1 original.append(item) def main(): command_line = sys.argv[1] print(sys.argv) if command_line == "merge": merge(sys.argv[2:]) if __name__ == "__main__": main()