import datetime from functools import reduce import json from operator import getitem import sys import os import itertools # /home/pakin/Codes/coffCfg/cofffeemachineConfig/coffeethai02_1550.json def GetDateTimeString(): now = datetime.datetime.now() # current date and time date_time = now.strftime("%d-%b-%Y, %H:%M:%S") return date_time HomeDirectory = "/home/pakin" CoffeeRecipeDirectory = "/home/pakin/Codes/coffCfg/cofffeemachineConfig" valid_len_product_code = len("12-01-02-0001") # events - save any action done by merge events = [] events_json = [] # last_change - current value of "LastChange" last_change = "" # last_change_path - key mapping to "LastChange" key, joined by '.' and could be splitted to access value in nested map last_change_path = "" # master_json - main dictionary(map) for another json to merge into. master_json = {} # config_ver - target main dictionary's "configNumber" global config_ver config_ver = -1 # holdonPD = the current product code that has been holding holdonPD = "" # output config global isJson isJson = False # Keys to be removed removed_keylist = [ "RemainingCups", "RemainingCupsWithTopping" ] # Set value of nested map (target - master_json) by using keys from 'key_list' and assigned value to it. def set_value_in_nested_map(key_list: list, value): reduce(getitem, key_list[:-1], master_json)[key_list[-1]] = value def get_value_in_nested_map(target_json: dict, key_list: list, isMaster=False): if "Recipe01" in key_list: if isMaster: key_list[1] += shared_master_position if "SubMenu" in key_list: if len(reduce(getitem, key_list[:3], target_json)) <= 0: return "-" # Handle no key case result = None try: result = reduce(getitem, key_list[:-1], target_json)[key_list[-1]] except: print(key_list, "->",result) pass return result # Decode key that was in form of String to the list def decode_path(str_with_dot: str) -> list: path_map = str_with_dot.split('.') keylist = [] for keyi in path_map: if keyi.isdigit(): keyi = int(keyi) elif keyi[1:].isdigit(): pass keylist.append(keyi) return keylist def fetch_pd(str_path: str, target_dict: dict) -> str: keyList = decode_path(str_with_dot=str_path) keyList.append("productCode" if "Recipe01" in str_path else "id") # print("decode and append get : ",keyList) return get_value_in_nested_map(target_json=target_dict, key_list=keyList) def fetch_matId(str_path: str, target_dict:dict) -> str: keyList = decode_path(str_with_dot=str_path) keyList.append("materialPathId") # print(keyList) return get_value_in_nested_map(target_json=target_dict, key_list=keyList) def fetch_defaultId(str_path: str, target_dict:dict) -> str: keyList = decode_path(str_with_dot=str_path) keyList.append("defaultIDSelect") # print(keyList) return get_value_in_nested_map(target_json=target_dict, key_list=keyList) def fetch_onlyMainMenuPath(str_with_dot: str): mainpath = decode_path(str_with_dot)[:2] # '.'.join(mainpath) return ".".join(str(p) for p in mainpath) def create_map(events_action: str, log: str, additional=[]) -> str: return { "timestamp": GetDateTimeString(), "event": events_action, "log": log, "addition": additional } def flatten(x, parent='', sep='.') -> dict: flattened = {} for key, value in x.items(): current_key = f"{parent}{sep}{key}" if parent else key if isinstance(value, dict): flattened.update(flatten(value, current_key, sep)) elif isinstance(value, list): for index, item in enumerate(value): list_key = f"{current_key}{sep}{index}" if isinstance(item, dict): flattened.update(flatten(item, list_key, sep)) else: flattened.update({list_key: item}) else: flattened[current_key] = value return flattened def unflatten(dictionary): result_dict = {} for key, value in dictionary.items(): parts = key.split('.') d = result_dict for part in parts[:-1]: if part.isdigit(): part = int(part) if isinstance(d, list): while len(d) <= part: d.append({}) d = d[part] else: if part not in d: d[part] = [] d = d[part] else: if part not in d: d[part] = {} d = d[part] if parts[-1].isdigit(): parts[-1] = int(parts[-1]) while len(d) <= parts[-1]: d.append(None) d[parts[-1]] = value else: d[parts[-1]] = value return result_dict def diff(dict1, dict2): exist_in_dict1 = set(dict1.keys()) - set(dict2.keys()) exist_in_dict2 = set(dict2.keys()) - set(dict1.keys()) keydiff = { key for key in set(dict1.keys()) & set(dict2.keys()) if dict1[key] != dict2[key] } keydiff = sorted(keydiff) # diff diff1 = {key: dict1[key] for key in exist_in_dict1} diff2 = {key: dict2[key] for key in exist_in_dict2} diffval = {key: (dict1[key], dict2[key]) for key in keydiff} return diff1, diff2, diffval def get_key(d, target) -> str: for key, value in d.items(): if value == target: return key return None def remove_key(d, key): if isinstance(d, dict): for key1 in list(d.keys()): if key1 == key: del d[key1] else: remove_key(d[key1], key) elif isinstance(d, list): for index, item in enumerate(d): if isinstance(item, dict): if key in list(item.keys()): del item[key] if "SubMenu" in list(item.keys()): for j, itemSub in enumerate(item["SubMenu"]): if key in list(itemSub.keys()): del itemSub[key] def string_to_date(str_date: str): time_obj = datetime.datetime.strptime(str_date, "%d-%b-%Y, %H:%M:%S").time() if "," in str_date else datetime.datetime.strptime(str_date, "%d-%b-%Y %H:%M:%S").time() return time_obj def CompareDate(master_lastchange, dev_lastchange) -> str: if master_lastchange > dev_lastchange: return "DevLegacy" elif master_lastchange == dev_lastchange: return "NoChange" elif master_lastchange < dev_lastchange: return "DevRelease" def GetMasterDevCompare(key_list: list, dev_date: any) -> bool: master_date = string_to_date(get_value_in_nested_map(master_json, decode_path(key_list), True)) comp = CompareDate(master_date, string_to_date(dev_date)) if comp == "DevLegacy" or comp == "NoChange": return False elif comp == "DevRelease": return True # ------------------------------------ HMTL -------------------------------------- def blinking_text(): return """ """ def p(content:str, isAction: bool) -> str: pclass = "" style = "style=\"float: left; margin: 15; padding: 10px; " # style if isAction: if "CHANGE" in content: style += """ background: greenyellow; """ elif "INSERT" in content: style += """ background: orange; """ else: if "VER" in content: style += """ background: aliceblue; """ elif "COUNT" in content or "OUTPUT" in content or "LOG" in content: style += """ background: aquamarine; """ elif "master" in content or "dev" in content: style += """ box-shadow: 2px 2px; """ if "master" in content: style += "background: lightgray; margin-right: -20px;" else: style += "background: lightblue;" elif "New key" in content or "New value" in content: style += """ border-radius: 25px; """ pclass = "class=\"blink\" " if "New key" in content: style += "background: lightgreen;" else: style += "background: gold;" style += "\">" return "\t\t
\n" def searchFnForHtml(): return """ """ def searchInputForHTML(): return """ """ def mergeV3pre(flat_dict_base:dict, flat_dict_dev:dict) -> dict: """ `version 3 preview` Merge 2 dictionary files together. Given that the dict must be flattened. `Warning` This will overwrite the exist key with new value. """ merged = flat_dict_base.copy() merged.update(flat_dict_dev) return merged # Merge diff value or append it to the main file. # 0 - master_path = master.json / to be merged # 1 - dev_path = dev.json / to merge into another # 2 - outfile = output # 3 - changefile = track log # 4 - debug = enable debug def merge(args): """ `version 2` Merge the 2 json files together with input of target paths, output path and changefile :param args: A list of arguments. list[] :type args: list where each element position; 0 - master_path = master.json / to be merged 1 - dev_path = dev.json / to merge into another 2 - outfile = merged json output 3 - changefile = track log (json) --- generate by merge to html,log,json 4 - debug = enable debug 5 - requester = requester's name sent by client :return: None :rtype: None """ print("ARGS: => ",args) master_path = args[0]; dev_path = args[1]; outfile_path = args[2]; changefile_path = args[3]; file_path, out_ext = os.path.splitext(changefile_path) if "json" in out_ext: global isJson isJson = True if len(args) > 4: global debug debug = args[4] if args[4] != None else False if len(args) > 5: global requester requester = args[5] if args[5] != None else "" # if (os.path.exists(master_path) == False) and str(master_path).isdigit(): if "/coffeethai02" not in master_path: master_path = CoffeeRecipeDirectory + "/coffeethai02_" + str(master_path) + ".json" master_file = open(master_path, 'rb') if (os.path.exists(dev_path) == False) and str(dev_path).isdigit(): if "/coffeethai02" not in dev_path: dev_path = CoffeeRecipeDirectory + "/coffeethai02_" + str(dev_path) + ".json" dev_file = open(dev_path, 'rb') masterName = master_file.name; devName = dev_file.name master_file = master_file.raw.readall() dev_file = dev_file.raw.readall() print("Master file size => ",os.stat(master_path).st_size) print("Dev file size => ",os.stat(dev_path).st_size) # # Requester events_json.append(create_map( events_action="REQUESTER", log="GetRequestFrom", additional=[{ "timestamp": GetDateTimeString(), "requester": requester }])) events.append(GetDateTimeString()+"\t[REQUEST]\t\tget request from \""+requester+"\"\n") events_json.append(create_map(events_action="MERGE", log=devName.split("/")[-1]+" into "+masterName.split("/")[-1])) events.append(GetDateTimeString()+"\t[MERGE]\t\tMerging "+devName.split("/")[-1]+" into "+masterName.split("/")[-1]+"\n") # print(GetDateTimeString()+"\tMerging "+dev_file.name.split("/")[-1]+" into "+master_file.name.split("/")[-1]+"\n") print(events[len(events) - 1]) # print("Test maintain => ", MaintianUnicodeEscapeDecoder.decoder(s=master_file)) global master_json global dev_json master_json_file:dict = json.loads(master_file) master_json = master_json_file.copy() dev_json_file:dict = json.loads(dev_file) dev_json = dev_json_file.copy() config_ver = master_json["MachineSetting"]["configNumber"] print(config_ver) global pdchange pdchange = 0 global pdadd pdadd = 0 # global holdonPD holdonPD = "" # Step 1: Flatten the dictionary flattened_master = flatten(master_json) flattened_dev = flatten(dev_json) # Step 2: Check key size of flattens try: if debug == "debug": print("master keys = ", len(flattened_master.keys()), ", dev keys = ", len(flattened_dev.keys())) except: pass # Step 3: Diff diff1, diff2, diffvals = diff(flattened_master, flattened_dev) # print("Exist in master = ", len(diff1)) # print("Exist in dev = ", len(diff2)) # print("Unmatched values = ", len(diffvals)) # m_pre = mergeV3pre(flat_dict_base=flattened_master, flat_dict_dev=flattened_dev) # final_m = unflatten(m_pre) # with open("./output/mergev3.json", "w+", encoding="utf-8") as testfile: # json.dump(final_m, testfile, indent=2, ensure_ascii=False) # Clean unused key for i, remove_item in enumerate(removed_keylist): remove_key(master_json, remove_item) merge_dicts(master_json, dev_json_file) try: if debug == "debug": print("/".join(changefile_path.split("/")[:-1])) except: pass if (os.path.exists("/".join(outfile_path.split("/")[:-1])) == False): os.makedirs("/".join(outfile_path.split("/")[:-1])) if (os.path.exists("/".join(changefile_path.split("/")[:-1])) == False): os.makedirs("/".join(changefile_path.split("/")[:-1])) with open(outfile_path, "w", encoding="utf-8") as outfile: json.dump(master_json, outfile, indent=2, ensure_ascii=False) # Create new filename by outfile_path outfile_path, outfile_ext = os.path.splitext(outfile_path) # ignore ext, get the last outfile_path_spl = str(outfile_path).split("/") path_version = outfile_path_spl[len(outfile_path_spl) - 1] pv = path_version.split("_")[1] # Include counts events_json.append(create_map(events_action="COUNT", log="Total="+str(pdchange))) events_json.append(create_map(events_action="COUNT", log="Total="+str(pdadd))) events_json.append(create_map(events_action="OUTPUT", log="Finished! write output to "+outfile_path)) events_json.append(create_map(events_action="LOG", log="Log is saved to "+file_path+"_"+pv+".json")) events.append(GetDateTimeString()+"\t[COUNT]\t\tTotal Change: "+str(pdchange)+"\n") events.append(GetDateTimeString()+"\t[COUNT]\t\tTotal Insert: "+str(pdadd)+"\n") events.append(GetDateTimeString()+"\t[OUTPUT]\t\tFinished! write output to "+outfile_path+"\n") events.append(GetDateTimeString()+"\t[LOG]\t\tLog is saved to "+file_path+"_"+pv+".json"+"\n") # log json file if not os.path.exists(file_path+"_"+pv+".json") or os.stat(file_path+"_"+pv+".json").st_size == 0: with open(file_path+"_"+pv+".json", "w",encoding="utf-8") as outlogfile: json.dump({"logs"+GetDateTimeString()+"*": events_json}, outlogfile, indent=2, ensure_ascii=False) else: print(file_path+"_"+pv+".json") logjs:dict = json.loads(open(file_path+"_"+pv+".json", encoding="utf-8").read()) logjs["logs"+GetDateTimeString()+"*"] = events_json json.dump(logjs, open(file_path+"_"+pv+".json", "w+", encoding="utf-8"), indent=2, ensure_ascii=False) # log file with open(file_path+"_"+pv+".log", "a+") as outlogfile2: try: for event in events: outlogfile2.write(event) except: raise Exception(event) # Create html version # add version of master to before extension html_string = "" if os.path.exists(file_path+"_"+pv+".html"): html_string = "" else: html_string = blinking_text()+searchFnForHtml()+"\n"+searchInputForHTML()+"\n" with open(file_path+"_"+pv+".html", "a+") as outlogHtml: html_string += "