#!/usr/bin/env python3
import os
import sys
import hashlib
import shutil
import re
import collections
import urllib.parse
import multiprocessing
from configparser import ConfigParser
import requests
def load_config():
"""
Load configuarion from pack and local configuration files
Fill in reasonable defaults where applicable.
"""
config_p = ConfigParser()
config_p.read(["pack.ini", "local-config.ini"])
config = config_p._sections
config["pack"]["sanitized_name"] = sanitize_text(config["pack"]["name"])
if "whitelist" not in config["pack"]:
config["pack"]["whitelist"] = []
else:
config["pack"]["whitelist"] = config["pack"]["whitelist"].split(",")
config["pack"]["game_version"] = game_version_from_string(config["pack"]["game_version"])
#TODO generate a default pack location
if "location" not in config["pack"]:
config["pack"]["location"] = f"/tmp/{config['pack']['sanitized_name']}"
# return the whole config file, pack configuration and modlist
return config
# take a string and only keep filename-friendly parts
def sanitize_text(text):
sanitized = ""
replacement_map = {" ": "-"}
for char in text:
if char.isalnum():
sanitized += char.lower()
elif char in replacement_map:
sanitized += replacement_map[char]
return sanitized
def read_file(fil):
"""
Given a filename, read its contents in as a list of tuples.
This function strips out comment lines and whitespaces.
"""
strings = []
with open(fil) as f:
for line in f:
string = line.strip().split()
if len(line) > 1 and line[0] != '#':
# run strip on each element
string = tuple(map(lambda x: x.strip(), string))
strings.append(string)
return strings
def get_version_from_file(fil):
with open(fil) as f:
for line in f:
if line.strip().split()[0] == "#VERSION":
return int(line.strip().split()[1])
return 0
def game_version_from_string(string):
if string is not None:
try:
return tuple(int(x) for x in string.split('.'))
except:
pass
return (2, 0, 0)
# Apply updates to the actual mod pack
def install(version_file, whitelist, pack_location):
pack_version = get_version_from_file(version_file)
print("Updating pack with version " + str(pack_version) + "...")
print()
# (fname, checksum, url)
mods = read_file(version_file)
names = [mod[0] for mod in mods]
# whitelist client mods (e.g. optifine)
names += whitelist
i = 0
for mod in mods:
mod_path = os.path.join(pack_location, mod[0])
i += 1
if os.path.exists(mod_path) and os.path.isfile(mod_path) and \
hashlib.sha1(open(mod_path, 'rb').read()).hexdigest() == mod[1]:
print("Skipping {mod[0]}, already up to date".format(mod=mod))
else:
print('Installing {mod[0]} from {mod[2]}...'.format(mod=mod))
print(' ({i} of {x})'.format(i=i,x=len(mods)), end='\r')
download_obj = requests.get(mod[2], stream=True)
with open(mod_path, "wb") as write_file:
shutil.copyfileobj(download_obj.raw, write_file)
print("Done!" + " " * 8)
print()
print("Removing old mods...")
for jar in os.listdir(pack_location):
if jar not in names and os.path.splitext(jar)[1] == ".jar":
os.remove(os.path.join(pack_location, jar))
print("Removing '{jar}'".format(jar=jar))
print()
print("Finished installing mods!")
# Using the latest urls, update downloads.txt to match and have the correct sha1
def apply_updates(mods, version_file, game_version=(2, 0, 0)):
pack_version = get_version_from_file(version_file)
print("Populating version file...")
print("Getting new versions of all mods...")
mod_urls = find_updated_urls([x for x in mods.values()], game_version, threads=3)
print("Downloading and checksumming all mods...")
checksums = find_checksums(mod_urls)
# Write information out to version.txt
with open(version_file, 'w') as f:
f.write('# Format: <jarname> <hex digested sha1> <direct download url>\n')
f.write("#VERSION " + str(pack_version + 1) + "\n")
for name, checksum, url in zip((k+'.jar' for k in mods.keys()), checksums, mod_urls):
f.write(f'{name} {checksum} {url}\n')
print()
print("Done!")
print(f"Updates applied to {version_file}")
print("New pack version is " + str(pack_version + 1))
print("[!] No mods were installed. To update your mods folder, run 'update.py install'")
# Find if any updates are available
def check_updates(mods, version_file, version=(2, 0, 0)):
pack_version = get_version_from_file(version_file)
print("Checking for updates to version " + str(pack_version) + "...")
latest = [(k, mods[k]) for k in mods.keys()]
old = read_file(version_file)
old_urls = [mod[2] for mod in old]
num_updates = 0
print("Checking updates...")
ffx = firefox()
for mod in latest:
print("Checking for updates to {mod[0]}...".format(mod=mod), end="")
sys.stdout.flush() # takes care of line-buffered terminals
if 'curseforge' in mod[1]:
url = find_cdn(ffx, mod[1], version)
else:
url = requests.get(mod[1]).url
if url in old_urls:
print(" No updates")
else:
print(" Found update: " + url.split('/')[-1])
num_updates += 1
ffx.close()
print("Finished checking for updates. {num} mods can be updated".format(num=num_updates))
if num_updates >= 0:
print("Run 'python update.py apply_updates' to create a new version with these updates applied.")
def threaded_find_url(homepage_url, game_version):
"""
Helper function that finds a single mod URL based on the homepage.
"""
if 'curseforge' in homepage_url:
ffx = firefox()
final_url = find_cdn(ffx, homepage_url, game_version)
ffx.close()
else:
final_url = requests.get(homepage_url).url
return final_url
def find_updated_urls(forge_urls, game_version, threads=20):
"""
Given a list of mod homepage URLs, find all of their direct download links in parallel.
"""
# First, check that we can successfully open a Firefox instance in the main thread.
# This provides us with a much nicer error message and quicker feedback.
f = firefox()
f.close()
with multiprocessing.Pool(threads) as pool:
# No progress indicator possible
# return pool.map(threaded_find_url, forge_urls)
# Much longer, but allows us to do a nice progress indicator
result_futures = []
for url in forge_urls:
result_futures.append(pool.apply_async(threaded_find_url, (url, game_version)))
results = []
for i,f in enumerate(result_futures):
results.append(f.get())
print(f'\r{i+1}/{len(result_futures)} URLs updated ({round((i+1)/len(result_futures)*100)}%)', end='')
print()
return results
def threaded_calc_sha1(direct_url):
"""
Helper function that downloads and calculates a single SHA1 hash from a direct download URL.
"""
resp = requests.get(direct_url)
hsh = hashlib.sha1(resp.content).hexdigest()
return hsh
def find_checksums(direct_urls, threads=8):
"""
Given a list of direct download URLs, download them all and calculate the SHA1 checksum of the file at that location.
"""
with multiprocessing.Pool(threads) as pool:
# Much longer, but allows us to do a nice progress indicator
result_futures = []
for url in direct_urls:
result_futures.append(pool.apply_async(threaded_calc_sha1, (url,)))
results = []
for i,f in enumerate(result_futures):
results.append(f.get())
print(f'\r{i+1}/{len(result_futures)} checksums calculated ({round((i+1)/len(result_futures)*100)}%)', end='')
print()
return results
def find_cdn(ffx, url, version):
"""
Given a mod home URL, finds the most up-to-date mod version compatible with the given game version.
Returns the direct Forge CDN download URL
"""
try:
# This goes to the "all files" page, where we get a table view of all
ffx.get(url + '/files/all')
mod_versions = ffx.find_elements_by_class_name("listing")[0].find_elements_by_xpath("tbody/tr") # extract the table of files from the page
row_info = collections.namedtuple("row_info", ["type", "filename", "cdn_id", "game_version"]) # create a custom tuple because data
rows = []
for version_entry in mod_versions:
# parse out the four fields that we use
entry_cells = version_entry.find_elements_by_tag_name("td")
release_type = entry_cells[0].text
# Note that this is NOT the final filename - this is just the "release name".
filename = urllib.parse.quote(entry_cells[1].find_elements_by_tag_name("a")[0].text)
try:
game_version = tuple([int(x) for x in entry_cells[4].find_element_by_class_name("mr-2").text.split(".")]) # get game version and convert to tuple
except:
game_version = (0, 0, 0)
cdn_id = entry_cells[1].find_element_by_tag_name("a").get_property("href").split("/")[-1]
#TODO make this configurable
if 'fabric' not in filename.lower() or 'forge' in filename.lower():
rows.append(row_info(release_type, filename, cdn_id, game_version))
rows.sort(key=lambda x: x.game_version, reverse=True)
best_row = next(x for x in rows if x.game_version <= version)
# We need to find the real, ForgeCDN compatible filename now by going to the file page.
ffx.get(f'{url}/files/{best_row.cdn_id}')
# This will probably break in the future
filename = ffx.find_elements_by_xpath("html/body/div/main/div/div/section/div/div/div/section/section/article/div/div/span")[1].text
# URL escape the filename!
filename = urllib.parse.quote(filename)
# ForgeCDN requires that the leading zeroes are stripped from each portion of the CDN ID, hence the int() cast.
return f'https://media.forgecdn.net/files/{int(best_row.cdn_id[:4])}/{int(best_row.cdn_id[4:])}/{filename}'
except:
print(url)
open('temp.txt', 'a').write(url)
import traceback; traceback.print_exc()
return None
def firefox():
"""
Start a headless Firefox instance and return the Selenium refrence to it.
"""
#print("Starting Selenium...")
try:
from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options
except:
print("Applying updates requires the `selenium` package")
exit(0)
options = Options()
options.add_argument('-headless')
options.add_argument('--window-size 1920,1080')
#for ~~cursed~~ windows people, put geckodriver in this folder
if(os.path.exists("./geckodriver")):
return Firefox(executable_path='./geckodriver', options=options)
return Firefox(options=options)