diff options
Diffstat (limited to 'util.py')
-rwxr-xr-x | util.py | 293 |
1 files changed, 293 insertions, 0 deletions
@@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +import os +import sys +import hashlib +import shutil +import re +import collections +import urllib.parse +import multiprocessing +from configparser import ConfigParser + +import requests + + +def load_config(filename="pack.ini"): + config = ConfigParser() + config.read(filename) + config["pack"]["sanitized_name"] = sanitize_text(config["pack"]["name"]) + #TODO generate a default pack location + config["pack"]["location"] = f"/tmp/{config['pack']['sanitized_name']}" + # return the whole config file, pack configuration and modlist + return config + +# take a string and only keep filename-friendly parts +def sanitize_text(text): + sanitized = "" + replacement_map = {" ": "-"} + for char in text: + if char.isalnum(): + sanitized += char.lower() + elif char in replacement_map: + sanitized += replacement_map[char] + return sanitized + + +def read_file(fil): + """ + Given a filename, read its contents in as a list of tuples. + This function strips out comment lines and whitespaces. + """ + strings = [] + with open(fil) as f: + for line in f: + string = line.strip().split() + if len(line) > 1 and line[0] != '#': + # run strip on each element + string = tuple(map(lambda x: x.strip(), string)) + strings.append(string) + + return strings + +def get_version_from_file(fil): + with open(fil) as f: + for line in f: + if line.strip().split()[0] == "#VERSION": + return int(line.strip().split()[1]) + return 0 + +def game_version_from_string(string): + if string is not None: + try: + return tuple(int(x) for x in string.split('.')) + except: + pass + return (2, 0, 0) + + +# Apply updates to the actual mod pack +def install(version_file, whitelist_file, pack_location): + pack_version = get_version_from_file(version_file) + print("Updating pack with version " + str(pack_version) + "...") + print() + # (fname, checksum, url) + mods = read_file(version_file) + names = [mod[0] for mod in mods] + # whitelist client mods (e.g. optifine) + names += [line[0] for line in read_file(whitelist_file)] + + i = 0 + for mod in mods: + mod_path = os.path.join(pack_location, mod[0]) + i += 1 + if os.path.exists(mod_path) and os.path.isfile(mod_path) and \ + hashlib.sha1(open(mod_path, 'rb').read()).hexdigest() == mod[1]: + print("Skipping {mod[0]}, already up to date".format(mod=mod)) + else: + print('Installing {mod[0]} from {mod[2]}...'.format(mod=mod)) + print(' ({i} of {x})'.format(i=i,x=len(mods)), end='\r') + download_obj = requests.get(mod[2], stream=True) + with open(mod_path, "wb") as write_file: + shutil.copyfileobj(download_obj.raw, write_file) + print("Done!" + " " * 8) + + print() + print("Removing old mods...") + for jar in os.listdir(pack_location): + if jar not in names and os.path.splitext(jar)[1] == ".jar": + os.remove(os.path.join(pack_location, jar)) + print("Removing '{jar}'".format(jar=jar)) + + print() + print("Finished installing mods!") + + +# Using the latest urls, update downloads.txt to match and have the correct sha1 +def apply_updates(mods, version_file, game_version=(2, 0, 0)): + pack_version = get_version_from_file(version_file) + print("Populating version file...") + print("Getting new versions of all mods...") + mod_urls = find_updated_urls([x for x in mods.values()], game_version, threads=3) + print("Downloading and checksumming all mods...") + checksums = find_checksums(mod_urls) + + # Write information out to version.txt + with open(version_file, 'w') as f: + f.write('# Format: <jarname> <hex digested sha1> <direct download url>\n') + f.write("#VERSION " + str(pack_version + 1) + "\n") + for name, checksum, url in zip((k+'.jar' for k in mods.keys()), checksums, mod_urls): + f.write(f'{name} {checksum} {url}\n') + + print() + print("Done!") + print(f"Updates applied to {version_file}") + print("New pack version is " + str(pack_version + 1)) + print("[!] No mods were installed. To update your mods folder, run 'update.py install'") + + +# Find if any updates are available +def check_updates(mods, version_file, version=(2, 0, 0)): + pack_version = get_version_from_file(version_file) + print("Checking for updates to version " + str(pack_version) + "...") + latest = [(k, mods[k]) for k in mods.keys()] + old = read_file(version_file) + old_urls = [mod[2] for mod in old] + num_updates = 0 + + print("Checking updates...") + ffx = firefox() + + for mod in latest: + print("Checking for updates to {mod[0]}...".format(mod=mod), end="") + sys.stdout.flush() # takes care of line-buffered terminals + if 'curseforge' in mod[1]: + url = find_cdn(ffx, mod[1], version) + else: + url = requests.get(mod[1]).url + if url in old_urls: + print(" No updates") + else: + print(" Found update: " + url.split('/')[-1]) + num_updates += 1 + ffx.close() + + print("Finished checking for updates. {num} mods can be updated".format(num=num_updates)) + if num_updates >= 0: + print("Run 'python update.py apply_updates' to create a new version with these updates applied.") + + +def threaded_find_url(homepage_url, game_version): + """ + Helper function that finds a single mod URL based on the homepage. + """ + if 'curseforge' in homepage_url: + ffx = firefox() + final_url = find_cdn(ffx, homepage_url, game_version) + ffx.close() + else: + final_url = requests.get(homepage_url).url + return final_url + + +def find_updated_urls(forge_urls, game_version, threads=20): + """ + Given a list of mod homepage URLs, find all of their direct download links in parallel. + """ + + # First, check that we can successfully open a Firefox instance in the main thread. + # This provides us with a much nicer error message and quicker feedback. + f = firefox() + f.close() + + with multiprocessing.Pool(threads) as pool: + # No progress indicator possible + # return pool.map(threaded_find_url, forge_urls) + + # Much longer, but allows us to do a nice progress indicator + result_futures = [] + for url in forge_urls: + result_futures.append(pool.apply_async(threaded_find_url, (url, game_version))) + + results = [] + for i,f in enumerate(result_futures): + results.append(f.get()) + print(f'\r{i+1}/{len(result_futures)} URLs updated ({round((i+1)/len(result_futures)*100)}%)', end='') + print() + + return results + + +def threaded_calc_sha1(direct_url): + """ + Helper function that downloads and calculates a single SHA1 hash from a direct download URL. + """ + resp = requests.get(direct_url) + hsh = hashlib.sha1(resp.content).hexdigest() + return hsh + + +def find_checksums(direct_urls, threads=8): + """ + Given a list of direct download URLs, download them all and calculate the SHA1 checksum of the file at that location. + """ + + with multiprocessing.Pool(threads) as pool: + # Much longer, but allows us to do a nice progress indicator + result_futures = [] + for url in direct_urls: + result_futures.append(pool.apply_async(threaded_calc_sha1, (url,))) + + results = [] + for i,f in enumerate(result_futures): + results.append(f.get()) + print(f'\r{i+1}/{len(result_futures)} checksums calculated ({round((i+1)/len(result_futures)*100)}%)', end='') + print() + + return results + + +def find_cdn(ffx, url, version): + """ + Given a mod home URL, finds the most up-to-date mod version compatible with the given game version. + Returns the direct Forge CDN download URL + """ + try: + # This goes to the "all files" page, where we get a table view of all + ffx.get(url + '/files/all') + mod_versions = ffx.find_elements_by_class_name("listing")[0].find_elements_by_xpath("tbody/tr") # extract the table of files from the page + row_info = collections.namedtuple("row_info", ["type", "filename", "cdn_id", "game_version"]) # create a custom tuple because data + rows = [] + for version_entry in mod_versions: + # parse out the four fields that we use + entry_cells = version_entry.find_elements_by_tag_name("td") + release_type = entry_cells[0].text + # Note that this is NOT the final filename - this is just the "release name". + filename = urllib.parse.quote(entry_cells[1].find_elements_by_tag_name("a")[0].text) + try: + game_version = tuple([int(x) for x in entry_cells[4].find_element_by_class_name("mr-2").text.split(".")]) # get game version and convert to tuple + except: + game_version = (0, 0, 0) + cdn_id = entry_cells[1].find_element_by_tag_name("a").get_property("href").split("/")[-1] + + #TODO make this configurable + if 'fabric' not in filename.lower() or 'forge' in filename.lower(): + rows.append(row_info(release_type, filename, cdn_id, game_version)) + rows.sort(key=lambda x: x.game_version, reverse=True) + best_row = next(x for x in rows if x.game_version <= version) + + # We need to find the real, ForgeCDN compatible filename now by going to the file page. + ffx.get(f'{url}/files/{best_row.cdn_id}') + # This will probably break in the future + filename = ffx.find_elements_by_xpath("html/body/div/main/div/div/section/div/div/div/section/section/article/div/div/span")[1].text + # URL escape the filename! + filename = urllib.parse.quote(filename) + + # ForgeCDN requires that the leading zeroes are stripped from each portion of the CDN ID, hence the int() cast. + return f'https://media.forgecdn.net/files/{int(best_row.cdn_id[:4])}/{int(best_row.cdn_id[4:])}/{filename}' + + except: + print(url) + open('temp.txt', 'a').write(url) + import traceback; traceback.print_exc() + return None + + +def firefox(): + """ + Start a headless Firefox instance and return the Selenium refrence to it. + """ + #print("Starting Selenium...") + try: + from selenium.webdriver import Firefox + from selenium.webdriver.firefox.options import Options + except: + print("Applying updates requires the `selenium` package") + exit(0) + options = Options() + options.add_argument('-headless') + options.add_argument('--window-size 1920,1080') + #for ~~cursed~~ windows people, put geckodriver in this folder + if(os.path.exists("./geckodriver")): + return Firefox(executable_path='./geckodriver', options=options) + return Firefox(options=options) + |