Source code for GSASIIpath

# -*- coding: utf-8 -*-
#GSASIIpath - file location & update routines
########### SVN repository information ###################
# $Date: 2024-04-03 21:44:28 -0500 (Wed, 03 Apr 2024) $
# $Author: toby $
# $Revision: 5775 $
# $URL: https://subversion.xray.aps.anl.gov/pyGSAS/trunk/GSASIIpath.py $
# $Id: GSASIIpath.py 5775 2024-04-04 02:44:28Z toby $
########### SVN repository information ###################
'''
:mod:`GSASIIpath` Classes & routines follow
'''

from __future__ import division, print_function
import os
import sys
import platform
import glob
import subprocess
import datetime as dt
try:
    import numpy as np
except ImportError:
    print("skipping numpy in GSASIIpath")
try:
    import requests
except:
    print('Python requests package not installed (required for web access')

# fix up path before using git. Needed when using conda without
#   activate (happens on MacOS w/GSAS-II.app)
pyPath = os.path.dirname(os.path.realpath(sys.executable))
if sys.platform != "win32" and pyPath not in os.environ['PATH'].split(':'):
    os.environ['PATH'] = pyPath + ':' + os.environ['PATH']
    
try:
    import git
except ImportError as msg:
    if 'Failed to initialize' in msg.msg:
        print('The gitpython package is unable to locate a git installation.')
        print('See https://gsas-ii.readthedocs.io/en/latest/packages.html for more information.')
    elif 'No module' in msg.msg:
        print('Python gitpython module not installed')
    else:
        print(f'gitpython failed to import, but why? Error:\n{msg}')
    print('Note: git and gitpython are required for GSAS-II to self-update')
except Exception as msg:
    print(f'git import failed with unexpected error:\n{msg}')
    print('Note: git and gitpython are required for GSAS-II to self-update')

# hard-coded github repo locations
G2binURL = "https://api.github.com/repos/AdvancedPhotonSource/GSAS-II-buildtools"
g2URL = "https://github.com/AdvancedPhotonSource/GSAS-II.git"
# tutorial repo owner & Repo name
gitTutorialOwn,gitTutorialRepo = 'AdvancedPhotonSource', 'GSAS-II-Tutorials'
    
path2GSAS2 = os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) # location of this file; save before any changes in pwd

# convert version numbers as '1.2.3' to integers (1002) and back (to 1.2)
fmtver = lambda v: str(v//1000)+'.'+str(v%1000)
intver = lambda vs: sum([int(i) for i in vs.split('.')[0:2]]*np.array((1000,1)))

[docs] def GetConfigValue(key,default=None): '''Return the configuration file value for key or a default value if not present :param str key: a value to be found in the configuration (config.py) file :param default: a value to be supplied if none is in the config file or the config file is not found. Defaults to None :returns: the value found or the default. ''' try: return configDict.get(key,default) except NameError: # this happens when building docs return None
[docs] def SetConfigValue(parmdict): '''Set configuration variables from a dictionary where elements are lists First item in list is the default value and second is the value to use. ''' global configDict for var in parmdict: if var in configDict: del configDict[var] if isinstance(parmdict[var],tuple): configDict[var] = parmdict[var] else: if parmdict[var][1] is None: continue if parmdict[var][1] == '': continue if parmdict[var][0] == parmdict[var][1]: continue configDict[var] = parmdict[var][1]
[docs] def addPrevGPX(fil,configDict): '''Add a GPX file to the list of previous files. Move previous names to start of list. Keep most recent five files ''' fil = os.path.abspath(os.path.expanduser(fil)) if 'previous_GPX_files' not in configDict: return try: pos = configDict['previous_GPX_files'][1].index(fil) if pos == 0: return configDict['previous_GPX_files'][1].pop(pos) # if present, remove if not 1st except ValueError: pass except AttributeError: configDict['previous_GPX_files'][1] = [] files = list(configDict['previous_GPX_files'][1]) files.insert(0,fil) configDict['previous_GPX_files'][1] = files[:5]
# routines for looking a version numbers in files version = -1
[docs] def SetVersionNumber(RevString): '''Set the subversion (svn) version number :param str RevString: something like "$Revision: 5775 $" that is set by subversion when the file is retrieved from subversion. Place ``GSASIIpath.SetVersionNumber("$Revision: 5775 $")`` in every python file. ''' try: RevVersion = int(RevString.split(':')[1].split()[0]) global version version = max(version,RevVersion) except: pass
[docs] def LoadConfigFile(filename): '''Read a GSAS-II configuration file. Comments (starting with "%") are removed, as are empty lines :param str filename: base file name (such as 'file.dat'). Files with this name are located from the path and the contents of each are concatenated. :returns: a list containing each non-empty (after removal of comments) line found in every matching config file. ''' info = [] for path in sys.path: fil = os.path.join(path,'inputs',filename) if not os.path.exists(fil): # patch 3/2024 for svn dir organization fil = os.path.join(path,filename) if not os.path.exists(fil): continue try: i = 0 fp = open(fil,'r') for line in fp: expr = line.split('#')[0].strip() if expr: info.append(expr) i += 1 print(str(i)+' lines read from config file '+fil) finally: fp.close() return info
[docs] def GetBinaryPrefix(pyver=None): '''Creates the first part of the binary directory name such as linux_64_p3.9 (where the full name will be linux_64_p3.9_n1.21). Note that any change made here is also needed in GetBinaryDir in fsource/SConstruct ''' if sys.platform == "win32": prefix = 'win' elif sys.platform == "darwin": prefix = 'mac' elif sys.platform.startswith("linux"): prefix = 'linux' else: print(u'Unknown platform: '+sys.platform) raise Exception('Unknown platform') if 'arm' in platform.machine() and sys.platform == "darwin": bits = 'arm' elif 'aarch' in platform.machine() and '64' in platform.architecture()[0]: bits = 'arm64' elif 'arm' in platform.machine(): bits = 'arm32' elif '64' in platform.architecture()[0]: bits = '64' else: bits = '32' # format current python version if pyver: pyver = 'p'+pyver else: pyver = 'p{}.{}'.format(*sys.version_info[0:2]) return '_'.join([prefix,bits,pyver])
#============================================================================== #============================================================================== # hybrid routines that use git & svn (to be revised to remove svn someday) G2_installed_result = None
[docs] def HowIsG2Installed(): '''Determines if GSAS-II was installed with git, svn or none of the above. Result is cached to avoid time needed for multiple calls of this. :returns: * a string starting with 'git' from git, if installed from the GSAS-II GitHub repository (defined in g2URL), the string is 'github', if the post-3/2024 directory structure is in use '-rev' is appended. * or 'svn' is installed from an svn repository (assumed as defined in g2home) * or 'noVCS' if installed without a connection to a version control system ''' global G2_installed_result if G2_installed_result is not None: return G2_installed_result try: g2repo = openGitRepo(path2GSAS2) if os.path.samefile(os.path.dirname(g2repo.common_dir),path2GSAS2): rev = '' else: rev = '-rev' if g2URL in g2repo.remote().urls: return 'github'+rev G2_installed_result = 'git'+rev return G2_installed_result except: pass if svnGetRev(verbose=False): G2_installed_result = 'svn' return G2_installed_result G2_installed_result = 'noVCS' return G2_installed_result
[docs] def GetVersionNumber(): '''Obtain a version number for GSAS-II from git, svn or from the files themselves, if no other choice. This routine was used to get the GSAS-II version from strings placed in files by svn with the version number being the latest number found, gathered by :func:`SetVersionNumber` (not 100% accurate as the latest version might have files changed that are not tagged by svn or with a call to SetVersionNumber. Post-svn this info will not be reliable, and this mechanism is replaced by a something created with a git hook, file git_verinfo.py (see the git_filters.py file). Before resorting to the approaches above, try to ask git or svn directly. :returns: an int value usually, but a value of 'unknown' might occur ''' if HowIsG2Installed().startswith('git'): g2repo = openGitRepo(path2GSAS2) for h in list(g2repo.iter_commits('HEAD'))[:50]: # (don't go too far back) tags = g2repo.git.tag('--points-at',h).split('\n') try: for item in tags: if item.isnumeric(): return int(item) except: pass elif HowIsG2Installed() == 'svn': rev = svnGetRev() if rev is not None: return rev # No luck asking, look up version information from git_verinfo.py try: import git_verinfo as gv try: for item in gv.git_tags: if item.isnumeric(): return int(item) except: pass try: for item in gv.git_prevtags: if item.isnumeric(): return int(item) except: pass except: pass # all else failed, use the SetVersionNumber value if version > 5000: # a small number must be wrong return version else: return "unknown"
def getG2VersionInfo(): if HowIsG2Installed().startswith('git'): g2repo = openGitRepo(path2GSAS2) commit = g2repo.head.commit ctim = commit.committed_datetime.strftime('%d-%b-%Y %H:%M') now = dt.datetime.now().replace( tzinfo=commit.committed_datetime.tzinfo) delta = now - commit.committed_datetime age = delta.total_seconds()/(60*60*24.) tags = g2repo.git.tag('--points-at',commit).split('\n') tags = [i for i in tags if i.isnumeric()] # get sequential version # (tag) gversion = "" if len(tags) >= 1: gversion = f"#{tags[0]}" else: for h in list(g2repo.iter_commits(commit))[:50]: # (don't go too far back) for t in g2repo.git.tag('--points-at',h).split('\n'): if t.isnumeric(): gversion = f"last tag #{t}" break if gversion: break else: gversion = "#?" # if all else fails msg = '' if g2repo.head.is_detached: msg = ("\n" + "**** You have reverted to a past version of GSAS-II. Please \n" + "contact the developers with what is preferred in this version ****" ) else: rc,lc,_ = gitCheckForUpdates(False,g2repo) if rc is None: msg = "\nOn locally defined branch?" elif age > 60 and len(rc) > 0: msg = f"\n**** This version is really old. Please update. >= {len(rc)} updates have been posted ****" elif age > 5 and len(rc) > 0: msg = f"\n**** Please consider updating. >= {len(rc)} updates have been posted" elif len(rc) > 0: msg = f"\nThis GSAS-II version is ~{len(rc)} updates behind current." return f" GSAS-II: {commit.hexsha[:6]} from {ctim} ({age:.1f} days old). Version: {gversion}{msg}" elif HowIsG2Installed() == 'svn': rev = svnGetRev() if rev is None: "no SVN" else: rev = f"SVN version {rev}" # patch 11/2020: warn if GSASII path has not been updated past v4576. # For unknown reasons on Mac with gsas2full, there have been checksum # errors in the .so files that prevented svn from completing updates. # If GSASIIpath.svnChecksumPatch is not present, then the fix for that # has not been retrieved, so warn. Keep for a year or so. try: svnChecksumPatch except: print('Warning GSAS-II incompletely updated. Please contact toby@anl.gov') # end patch return f"Latest GSAS-II revision: {GetVersionNumber()} (svn {rev})" else: try: import git_verinfo as gv if gv.git_tags: msg = f"{' '.join(gv.git_tags)}, Git: {gv.git_version[:6]}" else: msg = (f"{gv.git_version[:6]}; "+ f"Prev ver: {' '.join(gv.git_prevtags)}"+ f", {gv.git_prevtaggedversion[:6]}") return f"GSAS-II version: {msg} (Manual update)" except: pass # all else fails, use the old version number routine return f"GSAS-II installed manually, last revision: {GetVersionNumber()}" #============================================================================== #============================================================================== # routines to interface with git. BASE_HEADER = {'Accept': 'application/vnd.github+json', 'X-GitHub-Api-Version': '2022-11-28'} def openGitRepo(repo_path): try: # patch 3/2024 for svn dir organization return git.Repo(path2GSAS2) except git.InvalidGitRepositoryError: pass return git.Repo(os.path.dirname(path2GSAS2))
[docs] def gitLookup(repo_path,gittag=None,githash=None): '''Return information on a particular checked-in version of GSAS-II. :param str repo_path: location where GSAS-II has been installed :param str gittag: a tag value. :param str githash: hex hash code (abbreviated to as few characters as needed to keep it unique). If None (default), a tag must be supplied. :returns: either None if the tag/hash is not found or a tuple with four values (hash, tag-list, message,date_time) where * hash (str) is the git checking hash code; * tag-list is a list of tags (typically there will be one or two); * message is the check-in message (str) * date_time is the check-in date as a datetime object ''' g2repo = openGitRepo(repo_path) if gittag is not None and githash is not None: raise ValueError("Cannot specify a hash and a tag") if gittag is not None: try: commit = g2repo.tag(gittag).commit except ValueError: return None elif githash is not None: try: commit = g2repo.commit(githash) except git.BadName: return None else: raise ValueError("Must specify either a hash or a tag") tags = [i.name for i in g2repo.tags if i.commit == commit] return (commit.hexsha, tags, commit.message,commit.committed_datetime)
[docs] def gitHash2Tags(githash=None,g2repo=None): '''Find tags associated with a particular git commit. Note that if `githash` cannot be located because it does not exist or is not unique, a `git.BadName` exception is raised. :param str githash: hex hash code (abbreviated to as few characters as needed to keep it unique). If None (default), the HEAD is used. :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a list of tags (each a string) ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) if githash is None: commit = g2repo.head.object else: commit = g2repo.commit(githash) #return [i.name for i in g2repo.tags if i.commit == commit] # slow with a big repo return g2repo.git.tag('--points-at',commit).split('\n')
[docs] def gitTag2Hash(gittag,g2repo=None): '''Provides the hash number for a git tag. Note that if `gittag` cannot be located because it does not exist or is too old and is beyond the `depth` of the local repository, a `ValueError` exception is raised. :param str repo_path: location where GSAS-II has been installed. :param str gittag: a tag value. :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a str value with the hex hash for the commit. ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) return g2repo.tag(gittag).commit.hexsha
[docs] def gitTestGSASII(verbose=True,g2repo=None): '''Test a the status of a GSAS-II installation :param bool verbose: if True (default), status messages are printed :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: istat, with the status of the repository, with one of the following values: * -1: path is not found * -2: no git repository at path * -3: unable to access repository * value&1==1: repository has local changes (uncommitted/stashed) * value&2==2: repository has been regressed (detached head) * value&4==4: repository has staged files * value&8==8: repository has has been switched to non-master branch * value==0: no problems noted ''' if g2repo is None: if not os.path.exists(path2GSAS2): if verbose: print(f'Warning: Directory {path2GSAS2} not found') return -1 if os.path.exists(os.path.join(path2GSAS2,'..','.git')): path2repo = os.path.join(path2GSAS2,'..') # expected location elif os.path.exists(os.path.join(path2GSAS2,'.git')): path2repo = path2GSAS2 else: if verbose: print(f'Warning: Repository {path2GSAS2} not found') return -2 try: g2repo = openGitRepo(path2repo) except Exception as msg: if verbose: print(f'Warning: Failed to open repository. Error: {msg}') return -3 code = 0 if g2repo.is_dirty(): # has changed files code += 1 count_modified_files = len(g2repo.index.diff(None)) if g2repo.head.is_detached: code += 2 # detached else: if g2repo.active_branch.name != 'master': code += 8 # not on master branch if g2repo.index.diff("HEAD"): code += 4 # staged # test if there are local changes committed return code
[docs] def gitCheckForUpdates(fetch=True,g2repo=None): '''Provides a list of the commits made locally and those in the local copy of the repo that have not been applied. Does not provide useful information in the case of a detached Head (see :func:`countDetachedCommits` for that.) :param bool fetch: if True (default), updates are copied over from the remote repository (git fetch), before checking for changes. :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a list containing (remotecommits, localcommits, fetched) where * remotecommits is a list of hex hash numbers of remote commits and * localcommits is a list of hex hash numbers of local commits and * fetched is a bool that will be True if the update (fetch) step ran successfully Note that if the head is detached (GSAS-II has been reverted to an older version) or the branch has been changed, the values for each of the three items above will be None. ''' fetched = False if g2repo is None: g2repo = openGitRepo(path2GSAS2) if g2repo.head.is_detached: return (None,None,None) if fetch: try: g2repo.remote().fetch() fetched = True except git.GitCommandError as msg: print(f'Failed to get updates from {g2repo.remote().url}') try: head = g2repo.head.ref tracking = head.tracking_branch() localcommits = [i.hexsha for i in head.commit.iter_items(g2repo, f'{tracking.path}..{head.path}')] remotecommits = [i.hexsha for i in head.commit.iter_items(g2repo, f'{head.path}..{tracking.path}')] return remotecommits,localcommits,fetched except: return (None,None,None)
[docs] def countDetachedCommits(g2repo=None): '''Count the number of commits that have been made since a commit that is containined in the master branch returns the count and the commit object for the parent commit that connects the current stranded branch to the master branch. None is returned if no connection is found ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) if not g2repo.head.is_detached: return 0,g2repo.commit() # is detached head in master branch? if g2repo.commit() in g2repo.iter_commits('master'): return 0,g2repo.commit() # count number of commits since leaving master branch masterList = list(g2repo.iter_commits('master')) for c,i in enumerate(g2repo.commit().iter_parents()): if i in masterList: return c+1,i else: return None,None
[docs] def gitCountRegressions(g2repo=None): '''Count the number of new check ins on the master branch since the head was detached as well as any checkins made on the detached head. :returns: mastercount,detachedcount, where * mastercount is the number of check ins made on the master branch remote repository since the reverted check in was first made. * detachedcount is the number of check ins made locally starting from the detached head (hopefully 0) If the connection between the current head and the master branch cannot be established, None is returned for both. If the connection from the reverted check in to the newest version (I don't see how this could happen) then only mastercount will be None. ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) # get parent of current head that is in master branch detachedcount,parent = countDetachedCommits(g2repo) if detachedcount is None: return None,None mastercount = 0 for h in g2repo.iter_commits('master'): if h == parent: return mastercount,detachedcount mastercount += 1 return None,detachedcount
[docs] def gitGetUpdate(mode='Background'): '''Download the latest updates into the local copy of the GSAS-II repository from the remote master, but don't actually update the GSAS-II files being used. This can be done immediately or in background. In 'Background' mode, a background process is launched. The results from the process are recorded in file in ~/GSASII_bkgUpdate.log (located in %HOME% on Windows). A pointer to the created process is returned. In 'immediate' mode, the update is performed immediately. The function does not return until after the update is downloaded. :returns: In 'Background' mode, returns a Popen object (see subprocess). In 'immediate' mode nothing is returned. ''' if mode == 'Background': return subprocess.Popen([sys.executable, __file__, '--git-fetch']) else: g2repo = openGitRepo(path2GSAS2) g2repo.remote().fetch() if GetConfigValue('debug'): print(f'Updates fetched')
[docs] def gitHistory(values='tag',g2repo=None,maxdepth=100): '''Provides the history of commits to the master, either as tags or hash values :param str values: specifies what type of values are returned. If values=='hash', then hash values or for values=='tag', a list of list of tag(s). :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a list of str values where each value is a hash for a commit (values=='hash'), for values=='tag', a list of lists, where a list of tags is provided for each commit. When tags are provided, for any commit that does not have any associated tag(s), that entry is omitted from the list. for values=='both', a list of lists, where a hash is followed by a list of tags (if any) is provided ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) history = list(g2repo.iter_commits('master')) if values.lower().startswith('h'): return [i.hexsha for i in history] elif values.lower().startswith('t'): tagmap = {} # generate lookup table for to get tags for t in g2repo.tags: tagmap.setdefault(t.commit.hexsha, []).append(t.name) return [tagmap[i.hexsha] for i in history if i.hexsha in tagmap] elif values.lower().startswith('b'): # slow with history >thousands # tagmap = {} # generate lookup table for to get tags # for t in g2repo.tags: # tagmap.setdefault(t.commit.hexsha, []).append(t.name) # return [[i.hexsha]+tagmap.get(i.hexsha,[]) for i in history] # potentially faster code r1 = [[i.hexsha]+g2repo.git.tag('--points-at',i).split('\n') for i in history[:maxdepth]] return [[i[0]] if i[1]=='' else i for i in r1] else: raise ValueError(f'gitHistory has invalid value specified: {value}')
[docs] def getGitBinaryReleases(): '''Retrieves the binaries and download urls of the latest release :returns: a URL dict for GSAS-II binary distributions found in the newest release in a GitHub repository. The repo location is defined in global `G2binURL`. The dict keys are references to binary distributions, which are named as f"{platform}_p{pver}_n{npver}" where platform is determined in :func:`GSASIIpath.GetBinaryPrefix` (linux_64, mac_arm, win_64,...) and where `pver` is the Python version (such as "3.10") and `npver` is the numpy version (such as "1.26"). The value associated with each key contains the full URL to download a tar containing that binary distribution. ''' # Get first page of releases releases = requests.get( url=f"{G2binURL}/releases", headers=BASE_HEADER ).json() # Get assets of latest release assets = requests.get( url=f"{G2binURL}/releases/{releases[-1]['id']}/assets", headers=BASE_HEADER ).json() versions = [] URLs = [] for asset in assets: if asset['name'].endswith('.tgz'): versions.append(asset['name'][:-4]) # Remove .tgz tail URLs.append(asset['browser_download_url']) return dict(zip(versions,URLs))
[docs] def getGitBinaryLoc(npver=None,pyver=None,verbose=True): '''Identify the best GSAS-II binary download location from the distributions in the latest release section of the github repository on the CPU platform, and Python & numpy versions. The CPU & Python versions must match, but the numpy version may only be close. :param str npver: Version number to use for numpy, if None (default) the version is taken from numpy in the current Python interpreter. :param str pyver: Version number to use for Python, if None (default) the version is taken from the current Python interpreter. :param bool verbose: if True (default), status messages are printed :returns: a URL for the tar file (success) or None (failure) ''' bindir = GetBinaryPrefix(pyver) if npver: inpver = intver(npver) else: npver = np.__version__ inpver = intver(np.__version__) # get binaries matching the required install, approximate match for numpy URLdict = getGitBinaryReleases() versions = {} for d in URLdict: if d.startswith(bindir): v = intver(d.rstrip('/').split('_')[3].lstrip('n')) versions[v] = d intVersionsList = sorted(versions.keys()) if not intVersionsList: print('No binaries located to match',bindir) return elif inpver < min(intVersionsList): vsel = min(intVersionsList) if verbose: print( f'Warning: The requested numpy, version, {npver},' f' is older than\n\tthe oldest dist version, {fmtver(vsel)}') elif inpver >= max(intVersionsList): vsel = max(intVersionsList) if verbose and inpver == max(intVersionsList): print( f'The requested numpy version, {npver},' f' matches the binary dist, version {fmtver(vsel)}') elif verbose: print( f'Note: using a binary dist for numpy version {fmtver(vsel)} ' f'which is older than the requested numpy, version {npver}') else: vsel = min(intVersionsList) for v in intVersionsList: if v <= inpver: vsel = v else: if verbose: print( f'FYI: Selecting dist version {fmtver(vsel)}' f' as the requested numpy, version, {npver},' f'\n\tis older than the next dist version {fmtver(v)}') break return URLdict[versions[vsel]]
[docs] def InstallGitBinary(tarURL, instDir, nameByVersion=False, verbose=True): '''Install the GSAS-II binary files into the location specified. :param str tarURL: a URL for the tar file. :param str instDir: location directory to install files. This directory may not exist and will be created if needed. :param bool nameByVersion: if True, files are put into a subdirectory of `instDir`, named to match the tar file (with plaform, Python & numpy versions). Default is False, where the binary files are put directly into `instDir`. :param bool verbose: if True (default), status messages are printed. :returns: None ''' # packages not commonly used so import them here not on startup import requests import tempfile import tarfile # download to scratch tar = tempfile.NamedTemporaryFile(suffix='.tgz',delete=False) try: tar.close() if verbose: print(f'Downloading {tarURL}') r = requests.get(tarURL, allow_redirects=True) open(tar.name, 'wb').write(r.content) # open in tar tarobj = tarfile.open(name=tar.name) if nameByVersion: binnam = os.path.splitext(os.path.split(tarURL)[1])[0] install2dir = os.path.join(instDir,binnam) else: install2dir = instDir for f in tarobj.getmembers(): # loop over files in repository # do a bit of sanity checking for safety. Don't install anything # unless it goes into in the specified directory if sys.platform == "win32" and f.name.startswith('._'): continue # clean up Mac cruft for Windows if '/' in f.name or '\\' in f.name: print(f'skipping file {f.name} -- path alteration not allowed') continue if f.name != os.path.basename(f.name): print(f'skipping file {f.name} -- how did this happen?') continue newfil = os.path.normpath(os.path.join(install2dir,f.name)) tarobj.extract(f, path=install2dir, set_attrs=False) # set file mode and mod/access times (but not ownership) os.chmod(newfil,f.mode) os.utime(newfil,(f.mtime,f.mtime)) if verbose: print(f'Created GSAS-II binary file {newfil}') finally: del tarobj os.unlink(tar.name)
[docs] def GetRepoUpdatesInBackground(): '''Wrapper to make sure that :func:`gitGetUpdate` is called only if git has been used to install GSAS-II. :returns: returns a Popen object (see subprocess) ''' if HowIsG2Installed().startswith('git'): return gitGetUpdate(mode='Background')
[docs] def gitStartUpdate(cmdopts): '''Update GSAS-II in a separate process, by running this script with the options supplied in the call to this function and then exiting GSAS-II. ''' cmd = [sys.executable, __file__] + cmdopts if GetConfigValue('debug'): print('Starting updates with command\n\t'+ f'{" ".join(cmd)}') proc = subprocess.Popen(cmd) # on windows the current process needs to end so that the source files can # be written over. On unix the current process needs to stay running # so the child is not killed. if sys.platform != "win32": proc.wait() sys.exit()
[docs] def dirGitHub(dirlist,orgName=gitTutorialOwn, repoName=gitTutorialRepo): '''Obtain a the contents of a GitHub repository directory using the GitHub REST API. :param str dirlist: a list of sub-directories `['parent','child',sub']` for `parent/child/sub` or `[]` for a file in the top-level directory. :param str orgName: the name of the GitHub organization :param str repoName: the name of the GitHub repository :returns: a list of file names or None if the dirlist info does not reference a directory examples:: dirGitHub([], 'GSASII', 'TutorialTest') dirGitHub(['TOF Sequential Single Peak Fit', 'data']) The first example will get the contents of the top-level directory for the specified repository The second example will provide the contents of the "TOF Sequential Single Peak Fit"/data directory. ''' dirname = '' for item in dirlist: dirname += item + '/' URL = f"https://api.github.com/repos/{orgName}/{repoName}/contents/{dirname}" r = requests.get(URL, allow_redirects=True) try: return [rec['name'] for rec in r.json()] except: return None
[docs] def rawGitHubURL(dirlist,filename,orgName=gitTutorialOwn, repoName=gitTutorialRepo, branchname="master"): '''Create a URL that can be used to view/downlaod the raw version of file in a GitHub repository. :param str dirlist: a list of sub-directories `['parent','child',sub']` for `parent/child/sub` or `[]` for a file in the top-level directory. :param str filename: the name of the file :param str orgName: the name of the GitHub organization :param str repoName: the name of the GitHub repository :param str branchname: the name of the GitHub branch. Defaults to "master". :returns: a URL-encoded URL ''' import urllib.parse # not used very often, import only when needed dirname = '' for item in dirlist: # it's not clear that the URLencode is needed for the directory name dirname += urllib.parse.quote(item) + '/' #filename = urllib.parse.quote(filename) return f"https://raw.githubusercontent.com/{orgName}/{repoName}/{branchname}/{dirname}{filename}"
[docs] def downloadDirContents(dirlist,targetDir,orgName=gitTutorialOwn, repoName=gitTutorialRepo): '''Download the entire contents of a directory from a repository on GitHub. Used to download data for a tutorial. ''' filList = dirGitHub(dirlist, orgName=orgName, repoName=repoName) if filList is None: print(f'Directory {"/".join(dirlist)!r} does not have any files') return None for fil in filList: if fil.lower() == 'index.html': continue URL = rawGitHubURL(dirlist,fil,orgName=orgName,repoName=repoName) r = requests.get(URL, allow_redirects=True) outfil = os.path.join(targetDir,fil) if r.status_code == 200: open(outfil, 'wb').write(r.content) print(f'wrote {outfil}') elif r.status_code == 404: print(f'Warning: {fil} is likely a subdirectory of directory {"/".join(dirlist)!r}') else: print(f'Unexpected web response for {fil}: {r.status_code}') return
#============================================================================== #============================================================================== # routines to interface with subversion g2home = 'https://subversion.xray.aps.anl.gov/pyGSAS' # 'Define the location of the GSAS-II subversion repository' proxycmds = [] # 'Used to hold proxy information for subversion, set if needed in whichsvn' svnLocCache = None # 'Cached location of svn to avoid multiple searches for it'
[docs] def MakeByte2str(arg): '''Convert output from subprocess pipes (bytes) to str (unicode) in Python 3. In Python 2: Leaves output alone (already str). Leaves stuff of other types alone (including unicode in Py2) Works recursively for string-like stuff in nested loops and tuples. typical use:: out = MakeByte2str(out) or:: out,err = MakeByte2str(s.communicate()) ''' if isinstance(arg,str): return arg if isinstance(arg,bytes): try: return arg.decode() except: if GetConfigValue('debug'): print('Decode error') return arg if isinstance(arg,list): return [MakeByte2str(i) for i in arg] if isinstance(arg,tuple): return tuple([MakeByte2str(i) for i in arg]) return arg
[docs] def getsvnProxy(): '''Loads a proxy for subversion from the proxyinfo.txt file created by bootstrap.py or File => Edit Proxy...; If not found, then the standard http_proxy and https_proxy environment variables are scanned (see https://docs.python.org/3/library/urllib.request.html#urllib.request.getproxies) with case ignored and that is used. ''' global proxycmds proxycmds = [] proxyinfo = os.path.join(os.path.expanduser('~/.G2local/'),"proxyinfo.txt") if not os.path.exists(proxyinfo): proxyinfo = os.path.join(path2GSAS2,"proxyinfo.txt") if os.path.exists(proxyinfo): fp = open(proxyinfo,'r') host = fp.readline().strip() # allow file to begin with comments while host.startswith('#'): host = fp.readline().strip() port = fp.readline().strip() etc = [] line = fp.readline() while line: etc.append(line.strip()) line = fp.readline() fp.close() setsvnProxy(host,port,etc) return host,port,etc import urllib.request proxdict = urllib.request.getproxies() varlist = ("https","http") for var in proxdict: if var.lower() in varlist: proxy = proxdict[var] pl = proxy.split(':') if len(pl) < 2: continue host = pl[1].strip('/') port = '' if len(pl) == 3: port = pl[2].strip('/').strip() return host,port,'' return '','',''
[docs] def setsvnProxy(host,port,etc=[]): '''Sets the svn commands needed to use a proxy ''' global proxycmds proxycmds = [] host = host.strip() port = port.strip() if host: proxycmds.append('--config-option') proxycmds.append('servers:global:http-proxy-host='+host) if port: proxycmds.append('--config-option') proxycmds.append('servers:global:http-proxy-port='+port) for item in etc: proxycmds.append(item)
[docs] def whichsvn(): '''Returns a path to the subversion exe file, if any is found. Searches the current path after adding likely places where GSAS-II might install svn. :returns: None if svn is not found or an absolute path to the subversion executable file. ''' # use a previosuly cached svn location global svnLocCache if svnLocCache: return svnLocCache # prepare to find svn is_exe = lambda fpath: os.path.isfile(fpath) and os.access(fpath, os.X_OK) svnprog = 'svn' if sys.platform.startswith('win'): svnprog += '.exe' host,port,etc = getsvnProxy() if GetConfigValue('debug') and host: print('DBG_Using proxy host {} port {}'.format(host,port)) if GetConfigValue('svn_exec'): exe_file = GetConfigValue('svn_exec') print('Using ',exe_file) if is_exe(exe_file): try: p = subprocess.Popen([exe_file,'help'],stdout=subprocess.PIPE) res = p.stdout.read() if not res: return p.communicate() svnLocCache = os.path.abspath(exe_file) return svnLocCache except: pass # add likely places to find subversion when installed with GSAS-II pathlist = os.environ["PATH"].split(os.pathsep) pathlist.insert(0,os.path.split(sys.executable)[0]) pathlist.insert(1,path2GSAS2) for rpt in ('..','bin'),('..','Library','bin'),('svn','bin'),('svn',),('.'): pt = os.path.normpath(os.path.join(path2GSAS2,*rpt)) if os.path.exists(pt): pathlist.insert(0,pt) # search path for svn or svn.exe for path in pathlist: exe_file = os.path.join(path, svnprog) if is_exe(exe_file): try: p = subprocess.Popen([exe_file,'help'],stdout=subprocess.PIPE) res = p.stdout.read() if not res: return p.communicate() svnLocCache = os.path.abspath(exe_file) return svnLocCache except: pass svnLocCache = None
svn_version = None
[docs] def svnVersion(svn=None): '''Get the version number of the current subversion executable. The result is cached, as this takes a bit of time to run and is done a fair number of times. :returns: a string with a version number such as "1.6.6" or None if subversion is not found. ''' global svn_version if svn_version is not None: return svn_version if not svn: svn = whichsvn() if not svn: return cmd = [svn,'--version','--quiet'] s = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print ('subversion error!\nout=%s'%out) print ('err=%s'%err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) return None svn_version = out.strip() return svn_version
[docs] def svnVersionNumber(svn=None): '''Get the version number of the current subversion executable :returns: a fractional version number such as 1.6 or None if subversion is not found. ''' ver = svnVersion(svn) if not ver: return M,m = ver.split('.')[:2] return int(M)+int(m)/10.
[docs] def svnGetLog(fpath=os.path.split(__file__)[0],version=None): '''Get the revision log information for a specific version of the specified package :param str fpath: path to repository dictionary, defaults to directory where the current file is located. :param int version: the version number to be looked up or None (default) for the latest version. :returns: a dictionary with keys (one hopes) 'author', 'date', 'msg', and 'revision' ''' import xml.etree.ElementTree as ET svn = whichsvn() if not svn: return if version is not None: vstr = '-r'+str(version) else: vstr = '-rHEAD' cmd = [svn,'log',fpath,'--xml',vstr] if proxycmds: cmd += proxycmds s = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print ('out=%s'%out) print ('err=%s'%err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) return None x = ET.fromstring(out) d = {} for i in x.iter('logentry'): d = {'revision':i.attrib.get('revision','?')} for j in i: d[j.tag] = j.text break # only need the first return d
svnLastError = ''
[docs] def svnGetRev(fpath=os.path.split(__file__)[0],local=True,verbose=True): '''Obtain the version number for the either the last update of the local version or contacts the subversion server to get the latest update version (# of Head). :param str fpath: path to repository dictionary, defaults to directory where the current file is located :param bool local: determines the type of version number, where True (default): returns the latest installed update False: returns the version number of Head on the server :Returns: the version number as an str or None if there is a subversion error (likely because the path is not a repository or svn is not found). The error message is placed in global variable svnLastError ''' import xml.etree.ElementTree as ET svn = whichsvn() if not svn: return if local: cmd = [svn,'info',fpath,'--xml'] else: cmd = [svn,'info',fpath,'--xml','-rHEAD'] if svnVersionNumber() >= 1.6: cmd += ['--non-interactive', '--trust-server-cert'] if proxycmds: cmd += proxycmds # if GetConfigValue('debug'): # s = 'subversion command:\n ' # for i in cmd: s += i + ' ' # print(s) s = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: if verbose: print ('svn failed\n%s'%out) print ('err=%s'%err) print('\nsvn command:',' '.join(cmd)) global svnLastError svnLastError = err return None x = ET.fromstring(out) for i in x.iter('entry'): rev = i.attrib.get('revision') if rev: return rev
[docs] def svnFindLocalChanges(fpath=os.path.split(__file__)[0]): '''Returns a list of files that were changed locally. If no files are changed, the list has length 0 :param fpath: path to repository dictionary, defaults to directory where the current file is located :returns: None if there is a subversion error (likely because the path is not a repository or svn is not found) ''' import xml.etree.ElementTree as ET svn = whichsvn() if not svn: return cmd = [svn,'status',fpath,'--xml'] if proxycmds: cmd += proxycmds s = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: return None x = ET.fromstring(out) changed = [] for i in x.iter('entry'): if i.find('wc-status').attrib.get('item') == 'modified': changed.append(i.attrib.get('path')) return changed
[docs] def svnCleanup(fpath=os.path.split(__file__)[0],verbose=True): '''This runs svn cleanup on a selected local directory. :param str fpath: path to repository dictionary, defaults to directory where the current file is located ''' svn = whichsvn() if not svn: return if verbose: print(u"Performing svn cleanup at "+fpath) cmd = [svn,'cleanup',fpath] if verbose: showsvncmd(cmd) s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print(60*"=") print("****** An error was noted, see below *********") print(60*"=") print(err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) #raise Exception('svn cleanup failed') return False elif verbose: print(out) return True
[docs] def svnUpdateDir(fpath=os.path.split(__file__)[0],version=None,verbose=True): '''This performs an update of the files in a local directory from a server. :param str fpath: path to repository dictionary, defaults to directory where the current file is located :param version: the number of the version to be loaded. Used only cast as a string, but should be an integer or something that corresponds to a string representation of an integer value when cast. A value of None (default) causes the latest version on the server to be used. ''' svn = whichsvn() if not svn: return if version: verstr = '-r' + str(version) else: verstr = '-rHEAD' if verbose: print(u"Updating files at "+fpath) cmd = [svn,'update',fpath,verstr, '--non-interactive', '--accept','theirs-conflict','--force'] if svnVersionNumber() >= 1.6: cmd += ['--trust-server-cert'] if proxycmds: cmd += proxycmds #if verbose or GetConfigValue('debug'): if verbose: s = 'subversion command:\n ' for i in cmd: s += i + ' ' print(s) s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print(60*"=") print("****** An error was noted, see below *********") print(60*"=") print(err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) if svnCleanup(fpath): s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print(60*"=") print("****** Drat, failed again: *********") print(60*"=") print(err) else: return if 'Checksum' in err: # deal with Checksum problem err = svnChecksumPatch(svn,fpath,verstr) if err: print('error from svnChecksumPatch\n\t',err) else: return raise Exception('svn update failed') elif verbose: print(out)
def showsvncmd(cmd): s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s)
[docs] def svnChecksumPatch(svn,fpath,verstr): '''This performs a fix when svn cannot finish an update because of a Checksum mismatch error. This seems to be happening on OS X for unclear reasons. ''' print('\nAttempting patch for svn Checksum mismatch error\n') svnCleanup(fpath) cmd = [svn,'update','--set-depth','empty', os.path.join(fpath,'bindist')] showsvncmd(cmd) if svnVersionNumber() >= 1.6: cmd += ['--non-interactive', '--trust-server-cert'] s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) #if err: print('error=',err) cmd = [svn,'switch',g2home+'/trunk/bindist', os.path.join(fpath,'bindist'), '--non-interactive', '--trust-server-cert', '--accept', 'theirs-conflict', '--force', '-rHEAD', '--ignore-ancestry'] showsvncmd(cmd) s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) DownloadG2Binaries(g2home,verbose=True) cmd = [svn,'update','--set-depth','infinity', os.path.join(fpath,'bindist')] if svnVersionNumber() >= 1.6: cmd += ['--non-interactive', '--trust-server-cert'] showsvncmd(cmd) s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) #if err: print('error=',err) cmd = [svn,'update',fpath,verstr, '--non-interactive', '--accept','theirs-conflict','--force'] if svnVersionNumber() >= 1.6: cmd += ['--trust-server-cert'] if proxycmds: cmd += proxycmds showsvncmd(cmd) s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) #if err: print('error=',err) return err
[docs] def svnUpgrade(fpath=os.path.split(__file__)[0]): '''This reformats subversion files, which may be needed if an upgrade of subversion is done. :param str fpath: path to repository dictionary, defaults to directory where the current file is located ''' svn = whichsvn() if not svn: return cmd = [svn,'upgrade',fpath,'--non-interactive'] if proxycmds: cmd += proxycmds s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print("svn upgrade did not happen (this is probably OK). Messages:") print (err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s)
[docs] def svnUpdateProcess(version=None,projectfile=None,branch=None): '''perform an update of GSAS-II in a separate python process''' if not projectfile: projectfile = '' else: projectfile = os.path.realpath(projectfile) print ('restart using %s'%projectfile) if branch: version = branch elif not version: version = '' else: version = str(version) # start the upgrade in a separate interpreter (avoids loading .pyd files) ex = sys.executable if sys.platform == "darwin": # mac requires pythonw which is not always reported as sys.executable if os.path.exists(ex+'w'): ex += 'w' proc = subprocess.Popen([ex,__file__,projectfile,version]) if sys.platform != "win32": proc.wait() sys.exit()
[docs] def svnSwitchDir(rpath,filename,baseURL,loadpath=None,verbose=True): '''This performs a switch command to move files between subversion trees. Note that if the files were previously downloaded, the switch command will update the files to the newest version. :param str rpath: path to locate files, relative to the GSAS-II installation path (defaults to path2GSAS2) :param str URL: the repository URL :param str loadpath: the prefix for the path, if specified. Defaults to path2GSAS2 :param bool verbose: if True (default) diagnostics are printed ''' svn = whichsvn() if not svn: return URL = baseURL[:] if baseURL[-1] != '/': URL = baseURL + '/' + filename else: URL = baseURL + filename if loadpath: fpath = os.path.join(loadpath,rpath,filename) svntmp = os.path.join(loadpath,'.svn','tmp') else: fpath = os.path.join(path2GSAS2,rpath,filename) svntmp = os.path.join(path2GSAS2,'.svn','tmp') # fix up problems with missing empty directories if not os.path.exists(fpath): print('Repairing missing directory',fpath) cmd = [svn,'revert',fpath] s = subprocess.Popen(cmd,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if out: print(out) if err: print(err) if not os.path.exists(svntmp): print('Repairing missing directory',svntmp) cmd = ['mkdir',svntmp] s = subprocess.Popen(cmd,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if out: print(out) if err: print(err) cmd = [svn,'switch',URL,fpath, '--non-interactive','--trust-server-cert', '--accept','theirs-conflict','--force','-rHEAD'] if svnVersionNumber(svn) > 1.6: cmd += ['--ignore-ancestry'] if proxycmds: cmd += proxycmds if verbose: print(u"Loading files to "+fpath+u"\n from "+URL) s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print(60*"=") print ("****** An error was noted, see below *********") print(60*"=") print ('out=%s'%out) print ('err=%s'%err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) if svnCleanup(fpath): s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print(60*"=") print("****** Drat, failed again: *********") print(60*"=") print(err) else: return True return False if verbose: s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) print('\n=== Output from svn switch'+(43*'=')) print(out.strip()) print((70*'=')+'\n') return True
[docs] def svnInstallDir(URL,loadpath): '''Load a subversion tree into a specified directory :param str URL: the repository URL :param str loadpath: path to locate files ''' svn = whichsvn() if not svn: return cmd = [svn,'co',URL,loadpath,'--non-interactive'] if svnVersionNumber() >= 1.6: cmd += ['--trust-server-cert'] print("Loading files from "+URL) if proxycmds: cmd += proxycmds s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) #this fails too easily if err: print(60*"=") print ("****** An error was noted, see below *********") print(60*"=") print (err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) if svnCleanup(loadpath): s = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print(60*"=") print("****** Drat, failed again: *********") print(60*"=") print(err) return False else: return False print ("Files installed at: "+loadpath) return True
[docs] def svnGetFileStatus(fpath=os.path.split(__file__)[0],version=None): '''Compare file status to repository (svn status -u) :returns: updatecount,modcount,locked where updatecount is the number of files waiting to be updated from repository modcount is the number of files that have been modified locally locked is the number of files tagged as locked ''' import xml.etree.ElementTree as ET svn = whichsvn() if version is not None: vstr = '-r'+str(version) else: vstr = '-rHEAD' cmd = [svn,'st',fpath,'--xml','-u',vstr] if svnVersionNumber() >= 1.6: cmd += ['--non-interactive', '--trust-server-cert'] if proxycmds: cmd += proxycmds s = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE) out,err = MakeByte2str(s.communicate()) if err: print ('out=%s'%out) print ('err=%s'%err) s = '\nsvn command: ' for i in cmd: s += i + ' ' print(s) return None locked = 0 updatecount = 0 modcount = 0 x = ET.fromstring(out) for i0 in x.iter('entry'): filename = i0.attrib.get('path','?') wc_rev = '' status = '' switched = '' for i1 in i0.iter('wc-status'): wc_rev = i1.attrib.get('revision','') status = i1.attrib.get('item','') switched = i1.attrib.get('switched','') if i1.attrib.get('wc-locked',''): locked += 1 if status == "unversioned": continue if switched == "true": continue if status == "modified": modcount += 1 elif status == "normal": updatecount += 1 file_rev = '' for i2 in i1.iter('commit'): file_rev = i2.attrib.get('revision','') local_status = '' for i1 in i0.iter('repos-status'): local_status = i1.attrib.get('item','') #print(filename,wc_rev,file_rev,status,local_status,switched) return updatecount,modcount,locked
[docs] def svnList(URL,verbose=True): '''Get a list of subdirectories from and svn repository ''' svn = whichsvn() if not svn: print('**** unable to load files: svn not found ****') return '' # get binaries matching the required type -- other than for the numpy version cmd = [svn, 'list', URL,'--non-interactive', '--trust-server-cert'] if proxycmds: cmd += proxycmds if verbose: s = 'Running svn command:\n ' for i in cmd: s += i + ' ' print(s) p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) res,err = MakeByte2str(p.communicate()) return res
[docs] def DownloadG2Binaries(g2home,verbose=True): '''Download GSAS-II binaries from appropriate section of the GSAS-II svn repository based on the platform, numpy and Python version ''' bindir = GetBinaryPrefix() #npver = 'n{}.{}'.format(*np.__version__.split('.')[0:2]) inpver = intver(np.__version__) svn = whichsvn() if not svn: print('**** unable to load files: svn not found ****') return '' # get binaries matching the required type -- other than for the numpy version cmd = [svn, 'list', g2home + '/Binaries/','--non-interactive', '--trust-server-cert'] if proxycmds: cmd += proxycmds if verbose: s = 'Running svn command:\n ' for i in cmd: s += i + ' ' print(s) p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) res,err = MakeByte2str(p.communicate()) versions = {} for d in res.split(): if d.startswith(bindir): v = intver(d.rstrip('/').split('_')[3].lstrip('n')) versions[v] = d intVersionsList = sorted(versions.keys()) if not intVersionsList: print('No binaries located matching',bindir) return elif inpver < min(intVersionsList): vsel = min(intVersionsList) print('Warning: The current numpy version, {}, is older than\n\tthe oldest dist version, {}' .format(np.__version__,fmtver(vsel))) elif inpver >= max(intVersionsList): vsel = max(intVersionsList) if verbose: print( 'FYI: The current numpy version, {}, is newer than the newest dist version {}' .format(np.__version__,fmtver(vsel))) else: vsel = min(intVersionsList) for v in intVersionsList: if v <= inpver: vsel = v else: if verbose: print( 'FYI: Selecting dist version {} as the current numpy version, {},\n\tis older than the next dist version {}' .format(fmtver(vsel),np.__version__,fmtver(v))) break distdir = g2home + '/Binaries/' + versions[vsel] # switch reset command: distdir = g2home + '/trunk/bindist' svnSwitchDir('bindist','',distdir,verbose=verbose) return os.path.join(path2GSAS2,'bindist')
# def svnTestBranch(loc=None): # '''Returns the name of the branch directory if the installation has been switched. # Returns none, if not a branch # the test 2frame branch. False otherwise # ''' # if loc is None: loc = path2GSAS2 # svn = whichsvn() # if not svn: # print('**** unable to load files: svn not found ****') # return '' # cmd = [svn, 'info', loc] # if proxycmds: cmd += proxycmds # p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) # res,err = MakeByte2str(p.communicate()) # for l in res.split('\n'): # if "Relative URL:" in l: break # if "/branch/" in l: # return l[l.find("/branch/")+8:].strip() # else: # return None
[docs] def svnSwitch2branch(branch=None,loc=None,svnHome=None): '''Switch to a subversion branch if specified. Switches to trunk otherwise. ''' if svnHome is None: svnHome = g2home svnURL = svnHome + '/trunk' if branch: if svnHome.endswith('/'): svnURL = svnHome[:-1] else: svnURL = svnHome if branch.startswith('/'): svnURL += branch else: svnURL += '/' + branch svnSwitchDir('','',svnURL,loadpath=loc)
#============================================================================== #==============================================================================
[docs] def runScript(cmds=[], wait=False, G2frame=None): '''run a shell script of commands in an external process :param list cmds: a list of str's, each ietm containing a shell (cmd.exe or bash) command :param bool wait: if True indicates the commands should be run and then the script should return. If False, then the currently running Python will exit. Default is False :param wx.Frame G2frame: provides the location of the current .gpx file to be used to restart GSAS-II after running the commands, if wait is False. Default is None which prevents restarting GSAS-II regardless of the value of wait. ''' import tempfile if not cmds: #debug print('nothing to do in runScript') return if sys.platform != "win32": suffix = '.sh' else: suffix = '.bat' fp = tempfile.NamedTemporaryFile(mode='w', suffix=suffix, delete=False) shellname = fp.name for line in cmds: fp.write(line) fp.write('\n') if not wait: if G2frame: projectfile = '' if G2frame.GSASprojectfile: projectfile = os.path.realpath(G2frame.GSASprojectfile) main = os.path.join(path2GSAS2,'GSASII.py') ex = sys.executable if sys.platform == "darwin": # mac requires pythonw which is not always reported as sys.executable if os.path.exists(ex+'w'): ex += 'w' print ('restart using ',' '.join([ex,main,projectfile])) fp.write(' '.join([ex,main,projectfile])) fp.write('\n') fp.close() # start the upgrade in a separate interpreter (avoids loading .pyd files) if sys.platform != "win32": proc = subprocess.Popen(['bash',shellname]) else: proc = subprocess.Popen([shellname],shell=True) if wait: proc.wait() else: if sys.platform != "win32": proc.wait() sys.exit()
[docs] def IPyBreak_base(userMsg=None): '''A routine that invokes an IPython session at the calling location This routine is only used when debug=True is set in config.py ''' savehook = sys.excepthook # save the exception hook try: from IPython.terminal.embed import InteractiveShellEmbed except ImportError: try: # try the IPython 0.12 approach from IPython.frontend.terminal.embed import InteractiveShellEmbed except ImportError: print ('IPython InteractiveShellEmbed not found') return import inspect #from IPython import __version__ #if __version__.startswith('8.12.'): # see https://github.com/ipython/ipython/issues/13966 from IPython.core import getipython if getipython.get_ipython() is None: ipshell = InteractiveShellEmbed.instance() else: ipshell = InteractiveShellEmbed() frame = inspect.currentframe().f_back msg = 'Entering IPython console inside {0.f_code.co_filename} at line {0.f_lineno}\n'.format(frame) if userMsg: msg += userMsg # globals().update(locals()) # This might help with vars inside list comprehensions, etc. ipshell(msg,stack_depth=2) # Go up one level, to see the calling routine sys.excepthook = savehook # reset IPython's change to the exception hook
[docs] def exceptHook(*args): '''A routine to be called when an exception occurs. It prints the traceback with fancy formatting and then calls an IPython shell with the environment of the exception location. This routine is only used when debug=True is set in config.py ''' try: from IPython.core import ultratb except: pass try: from IPython.terminal.embed import InteractiveShellEmbed import IPython.core if sys.platform.startswith('win'): IPython.core.ultratb.FormattedTB(call_pdb=False,color_scheme='NoColor')(*args) else: IPython.core.ultratb.FormattedTB(call_pdb=False,color_scheme='LightBG')(*args) from IPython.core import getipython if getipython.get_ipython() is None: ipshell = InteractiveShellEmbed.instance() else: ipshell = InteractiveShellEmbed() except ImportError: print ('IPython not installed or is really old') return import inspect frame = inspect.getinnerframes(args[2])[-1][0] msg = 'Entering IPython console at {0.f_code.co_filename} at line {0.f_lineno}\n'.format(frame) savehook = sys.excepthook # save the exception hook try: ipshell(msg,local_ns=frame.f_locals,global_ns=frame.f_globals) # newest (IPython >= 8) except DeprecationWarning: # IPython <=7 try: # IPython >=5 class c(object): pass pseudomod = c() # create something that acts like a module pseudomod.__dict__ = frame.f_locals InteractiveShellEmbed(banner1=msg)(module=pseudomod,global_ns=frame.f_globals) except: # 'IPython <5 InteractiveShellEmbed(banner1=msg)(local_ns=frame.f_locals,global_ns=frame.f_globals) sys.excepthook = savehook # reset IPython's change to the exception hook
[docs] def DoNothing(): '''A routine that does nothing. This is called in place of IPyBreak and pdbBreak except when the debug option is set True in config.py ''' pass
[docs] def InvokeDebugOpts(): 'Called in GSASII.py to set up debug options' if any('SPYDER' in name for name in os.environ): print('Running from Spyder, keeping breakpoint() active & skipping exception trapping') elif GetConfigValue('debug'): try: import pdb global pdbBreak pdbBreak = pdb.set_trace import IPython global IPyBreak IPyBreak = IPyBreak_base sys.excepthook = exceptHook os.environ['PYTHONBREAKPOINT'] = 'GSASIIpath.IPyBreak_base' print ('Debug on: IPython: Exceptions and G2path.IPyBreak(); pdb: G2path.pdbBreak()') except: print ('Debug on failed. IPython not installed?') else: # not in spyder or debug enabled, hide breakpoints os.environ['PYTHONBREAKPOINT'] = '0'
[docs] def TestSPG(fpth): '''Test if pyspg.[so,.pyd] can be run from a location in the path ''' try: if not os.path.exists(fpth): return False if not glob.glob(os.path.join(fpth,'pyspg.*')): return False except: return False savpath = sys.path[:] sys.path = [fpth] # test to see if a shared library can be used try: import pyspg pyspg.sgforpy('P -1') except Exception as err: print(70*'=') print(f'Failed to run pyspg in {fpth!r}\nerror: {err}') print(70*'=') sys.path = savpath return False sys.path = savpath return True
[docs] def SetBinaryPath(printInfo=False, loadBinary=False): ''' Add location of GSAS-II shared libraries (binaries: .so or .pyd files) to path This routine must be executed after GSASIIpath is imported and before any other GSAS-II imports are done, since they assume binary files are in path :param bool printInfo: When True, information is printed to show has happened (default is False) :param bool loadBinary: when True, if the binary files fail to load, an attempt is made to download the binaries (default is False). TODO: the loadBinary option not implemented at present and is not used in any of the calls to SetBinaryPath ''' # do this only once no matter how many times it is called global BinaryPathLoaded,binaryPath,BinaryPathFailed if BinaryPathLoaded: return if BinaryPathFailed: return try: inpver = intver(np.__version__) except (AttributeError,TypeError): # happens on building docs return if path2GSAS2 not in sys.path: sys.path.insert(0,path2GSAS2) # make sure current path is used binpath = None binprfx = GetBinaryPrefix() # places to look for the GSAS-II binary directory binseapath = [os.path.abspath(sys.path[0])] # where Python is installed binseapath += [os.path.abspath(os.path.dirname(__file__))] # directory where this file is found binseapath += [os.path.dirname(binseapath[-1])] # parent of above directory binseapath += [os.path.expanduser('~/.GSASII')] # directory in user's home def appendIfExists(searchpathlist,loc,subdir): newpath = os.path.join(loc,subdir) if os.path.exists(newpath): if newpath in searchpathlist: return searchpathlist.append(newpath) for loc in binseapath: # Look at bin directory (created by a local compile) before looking for standard dist files searchpathlist = [] appendIfExists(searchpathlist,loc,'bin') appendIfExists(searchpathlist,loc,'bindist') appendIfExists(searchpathlist,loc,'GSASII-bin') # also look for directories named by platform etc in loc/AllBinaries or loc versions = {} namedpath = glob.glob(os.path.join(loc,'AllBinaries',binprfx+'*')) namedpath += glob.glob(os.path.join(loc,'GSASII-bin',binprfx+'*')) for d in namedpath: d = os.path.realpath(d) v = intver(d.rstrip('/').split('_')[-1].lstrip('n')) versions[v] = d vmin = None vmax = None # try to order the search in a way that makes sense for v in sorted(versions.keys()): if v <= inpver: vmin = v elif v > inpver: vmax = v break if vmin in versions: searchpathlist.append(versions[vmin]) if vmax in versions: searchpathlist.append(versions[vmax]) for fpth in searchpathlist: if TestSPG(fpth): binpath = fpth # got one that works, look no farther! break else: continue break if binpath: # were GSAS-II binaries found? binaryPath = binpath BinaryPathLoaded = True elif not loadBinary: print('*** ERROR: Unable to find GSAS-II binaries. Much of GSAS-II cannot function') BinaryPathFailed = True return None else: # try loading them BinaryPathFailed = True raise Exception("**** ERROR GSAS-II binary libraries not found and loadBinary not"+ "\nimplemented in SetBinaryPath, GSAS-II cannot run ****""") # if printInfo: # print('Attempting to download GSAS-II binary files...') # try: # binpath = DownloadG2Binaries(g2home) # except AttributeError: # this happens when building in Read The Docs # if printInfo: # print('Problem with download') # if binpath and TestSPG(binpath): # if printInfo: # print('GSAS-II binary directory: {}'.format(binpath)) # sys.path.insert(0,binpath) # binaryPath = binpath # BinaryPathLoaded = True # # this must be imported before anything that imports any .pyd/.so file for GSASII # else: # if printInfo: # print(75*'*') # print('Use of GSAS-II binary directory {} failed!'.format(binpath)) # print(75*'*') # raise Exception("**** ERROR GSAS-II binary libraries not found, GSAS-II cannot run ****") # add the data import and export directory to the search path if binpath not in sys.path: sys.path.insert(0,binpath) if printInfo: print(f'GSAS-II binary directory: {binpath}') newpath = os.path.join(path2GSAS2,'imports') if newpath not in sys.path: sys.path.append(newpath) newpath = os.path.join(path2GSAS2,'exports') if newpath not in sys.path: sys.path.append(newpath) LoadConfig(printInfo)
def LoadConfig(printInfo=True): # setup read of config.py, if present global configDict try: import config configDict = config.__dict__ import inspect vals = [True for i in inspect.getmembers(config) if '__' not in i[0]] if printInfo: print (str(len(vals))+' values read from config file '+os.path.abspath(config.__file__)) except ImportError: configDict = {'Clip_on':True} except Exception as err: print(60*'*',"\nError reading config.py file") if printInfo: import traceback print(traceback.format_exc()) print(60*'*') configDict = {'Clip_on':True} # def MacStartGSASII(g2script,project=''): # '''Start a new instance of GSAS-II by opening a new terminal window and starting # a new GSAS-II process. Used on Mac OS X only. # :param str g2script: file name for the GSASII.py script # :param str project: GSAS-II project (.gpx) file to be opened, default is blank # which opens a new project # ''' # if project and os.path.splitext(project)[1] != '.gpx': # print(f'file {project} cannot be used. Not GSAS-II project (.gpx) file') # return # if project and not os.path.exists(project): # print(f'file {project} cannot be found.') # return # elif project: # project = os.path.abspath(project) # if not os.path.exists(project): # print(f'lost project {project} with abspath') # raise Exception(f'lost project {project} with abspath') # g2script = os.path.abspath(g2script) # pythonapp = sys.executable # if os.path.exists(pythonapp+'w'): pythonapp += 'w' # script = f''' # set python to "{pythonapp}" # set appwithpath to "{g2script}" # set filename to "{project}" # set filename to the quoted form of the POSIX path of filename # tell application "Terminal" # activate # do script python & " " & appwithpath & " " & filename & "; exit" # end tell # ''' # subprocess.Popen(["osascript","-e",script])
[docs] def MacRunScript(script): '''Start a bash script in a new terminal window. Used on Mac OS X only. :param str script: file name for a bash script ''' script = os.path.abspath(script) osascript = ''' set bash to "/bin/bash" set filename to "{}" tell application "Terminal" activate do script bash & " " & filename & "; exit" end tell '''.format(script) subprocess.Popen(["osascript","-e",osascript])
#============================================================================== #============================================================================== # conda/pip routines
[docs] def findConda(): '''Determines if GSAS-II has been installed as g2conda or gsas2full with conda located relative to this file. We could also look for conda relative to the python (sys.executable) image, but I don't want to muck around with python that someone else installed. ''' parent = os.path.split(path2GSAS2)[0] if sys.platform != "win32": activate = os.path.join(parent,'bin','activate') conda = os.path.join(parent,'bin','conda') else: activate = os.path.join(parent,'Scripts','activate.bat') conda = os.path.join(parent,'condabin','conda.bat') if os.path.exists(activate) and os.path.exists(conda): return conda,activate else: return None
[docs] def condaTest(requireAPI=False): '''Returns True if it appears that Python is being run under Anaconda Python with conda present. Tests for conda environment vars and that the conda package is installed in the current environment. :returns: True, if running under Conda ''' if not all([(i in os.environ) for i in ('CONDA_DEFAULT_ENV','CONDA_EXE', 'CONDA_PREFIX', 'CONDA_PYTHON_EXE')]): return False if requireAPI: # is the conda package available? try: import conda.cli.python_api except: print('You do not have the conda package installed in this environment', '\nConsider using the "conda install conda" command') return False # There is no foolproof way to check if someone activates conda # but then calls a different Python using its path... # ...If we are in the base environment then the conda Python # should be the same path as the one currently being run: if os.environ['CONDA_DEFAULT_ENV'] == 'base': try: if os.path.samefile(os.environ['CONDA_PYTHON_EXE'], sys.executable): return True except: return False # ...If not in the base environment, what we can do is check if the # python we are running in shares the beginning part of its path with # the one in the base installation: dir1 = os.path.dirname(os.environ['CONDA_PYTHON_EXE']) dir2 = os.path.dirname(sys.executable) if sys.platform != "win32": # python in .../bin/.. dir1 = os.path.dirname(dir1) dir2 = os.path.dirname(dir2) return commonPath(dir1,dir2)
[docs] def condaInstall(packageList): '''Installs one or more packages using the anaconda conda package manager. Can be used to install multiple packages and optionally use channels. :param list packageList: a list of strings with name(s) of packages and optionally conda options. Examples:: packageList=['gsl'] packageList=['-c','conda-forge','wxpython'] packageList=['numpy','scipy','matplotlib'] :returns: None if the the command ran normally, or an error message if it did not. ''' try: import conda.cli.python_api except: print('You do not have the conda package installed in this environment', '\nConsider using the "conda install conda" command') return None try: (out, err, rc) = conda.cli.python_api.run_command( conda.cli.python_api.Commands.INSTALL,packageList # use_exception_handler=True#, stdout=sys.stdout, stderr=sys.stderr) ) #print('rc=',rc) print('Ran conda. output follows...') print(70*'='+'\n'+out+'\n'+70*'=') #print('err=',err) if rc != 0: return str(out) except Exception as msg: print("Error occurred, see below\n",msg) return "error occurred" return None
[docs] def fullsplit(fil,prev=None): '''recursive routine to split all levels of directory names ''' if prev is None: # first call: normalize and drop file name fil = os.path.normcase(os.path.abspath(os.path.dirname(fil))) prev = [] i,j = os.path.split(fil) if j: prev.insert(0,j) out = fullsplit(i,prev) else: return [i]+prev return out
[docs] def commonPath(dir1,dir2): '''Check if two directories share a path. Note that paths are considered the same if either directory is a subdirectory of the other, but not if they are in different subdirectories /a/b/c shares a path with /a/b/c/d but /a/b/c/d and /a/b/c/e do not. :returns: True if the paths are common ''' for i,j in zip(fullsplit(dir1),fullsplit(dir2)): if i != j: return False return True
[docs] def pipInstall(packageList): '''Installs one or more packages using the pip package installer. Use of this should be avoided if conda can be used (see :func:`condaTest` to test for conda). Can be used to install multiple packages together. One can use pip options, but this is probably not needed. :param list packageList: a list of strings with name(s) of packages Examples:: packageList=['gsl'] packageList=['wxpython','matplotlib','scipy'] packageList=[r'\\Mac\\Home\\Scratch\\wheels\\pygsl-2.3.3-py3-none-any.whl'] packageList=['z:/Scratch/wheels/pygsl-2.3.3-py3-none-any.whl'] :returns: None if the the command ran normally, or an error message if it did not. ''' try: subprocess.check_call([sys.executable, '-m', 'pip', 'install']+packageList) except Exception as msg: return msg return None
[docs] def condaEnvCreate(envname, packageList, force=False): '''Create a Python interpreter in a new conda environment. Use this when there is a potential conflict between packages and it would be better to keep the packages separate (which is one of the reasons conda supports environments). Note that conda should be run from the case environment; this attempts to deal with issues if it is not. :param str envname: the name of the environment to be created. If the environment exists, it will be overwritten only if force is True. :param list packageList: a list of conda install create command options, such as:: ['python=3.7', 'conda', 'gsl', 'diffpy.pdffit2', '-c', 'conda-forge', '-c', 'diffpy'] :param bool force: if False (default) an error will be generated if an environment exists :returns: (status,msg) where status is True if an error occurs and msg is a string with error information if status is True or the location of the newly-created Python interpreter. ''' if not all([(i in os.environ) for i in ('CONDA_DEFAULT_ENV', 'CONDA_EXE', 'CONDA_PREFIX', 'CONDA_PYTHON_EXE')]): return True,'not running under conda?' try: import conda.cli.python_api except: return True,'conda package not available (in environment)' # workaround for bug that avoids nesting packages if running from an # environment (see https://github.com/conda/conda/issues/11493) p = os.path.dirname(os.path.dirname(os.environ['CONDA_EXE'])) if not os.path.exists(os.path.join(p,'envs')): msg = ('Error derived installation path not found: '+ os.path.join(p,'envs')) print(msg) return True,msg newenv = os.path.join(p,'envs',envname) if os.path.exists(newenv) and not force: msg = 'path '+newenv+' already exists and force is not set, aborting' print(msg) return True,msg pathList = ['-p',newenv] try: (out, err, rc) = conda.cli.python_api.run_command( conda.cli.python_api.Commands.CREATE, packageList + pathList, use_exception_handler=True, stdout=sys.stdout, stderr=sys.stderr ) #print('rc=',rc) #print('out=',out) #print('err=',err) if rc != 0: return True,str(out) if sys.platform == "win32": newpython = os.path.join(newenv,'python.exe') else: newpython = os.path.join(newenv,'bin','python') if os.path.exists(newpython): return False,newpython return True,'Unexpected, '+newpython+' not found' except Exception as msg: print("Error occurred, see below\n",msg) return True,'Error: '+str(msg)
[docs] def addCondaPkg(): '''Install the conda API into the current conda environment using the command line, so that the API can be used in the current Python interpreter Attempts to do this without a shell failed on the Mac because it seems that the environment was inherited; seems to work w/o shell on Windows. ''' if not all([(i in os.environ) for i in ('CONDA_DEFAULT_ENV','CONDA_EXE', 'CONDA_PREFIX', 'CONDA_PYTHON_EXE')]): return None condaexe = os.environ['CONDA_EXE'] currenv = os.environ['CONDA_DEFAULT_ENV'] if sys.platform == "win32": cmd = [os.environ['CONDA_EXE'],'install','conda','-n',currenv,'-y'] p = subprocess.Popen(cmd, #stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: script = 'source ' + os.path.join( os.path.dirname(os.environ['CONDA_PYTHON_EXE']), 'activate') + ' base; ' script += 'conda install conda -n '+currenv+' -y' p = subprocess.Popen(script,shell=True,env={}, #stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = MakeByte2str(p.communicate()) #print('Output from adding conda:\n',out) if err: print('Note error/warning:') print(err) if currenv == "base": print('\nUnexpected action: adding conda to base environment???')
#============================================================================== #============================================================================== # routines for reorg of GSAS-II directory layout
[docs] def getIconFile(imgfile): '''Looks in either the main GSAS-II install location (old) or subdirectory icons (after reorg) for an icon :returns: the full path for the icon file ''' if os.path.exists(os.path.join(path2GSAS2,'icons',imgfile)): return os.path.join(path2GSAS2,'icons',imgfile) if os.path.exists(os.path.join(path2GSAS2,imgfile)): # patch 3/2024 for svn dir organization return os.path.join(path2GSAS2,imgfile) print(f'getIconFile Warning: file {imgfile} not found') return None
#============================================================================== #==============================================================================
[docs] def makeScriptShortcut(): '''Creates a shortcut to GSAS-II in the current Python installation so that "import G2script" (or "import G2script as GSASIIscripting") can be used without having to add GSASII to the path. The new shortcut is then tested. :returns: returns the name of the created file if successful. None indicates an error. ''' import datetime as dt for p in sys.path: if 'site-packages' in p: break else: print('No site-packages directory found in Python path') return newfil = os.path.join(p,'G2script.py') fp = open(newfil,'w') fp.write(f'#Created in makeScriptShortcut from {__file__}') fp.write(dt.datetime.strftime(dt.datetime.now(), " at %Y-%m-%dT%H:%M\n")) fp.write(f"""import sys,os Path2GSASII='{path2GSAS2}' if os.path.exists(os.path.join(Path2GSASII,'GSASIIscriptable.py')): print('setting up GSASIIscriptable from',Path2GSASII) if Path2GSASII not in sys.path: sys.path.insert(0,Path2GSASII) from GSASIIscriptable import * else: print('GSASIIscriptable not found in ',Path2GSASII) print('Rerun "Install GSASIIscriptable shortcut" from inside GSAS-II') sys.exit() """) fp.close() print('Created file',newfil) try: import G2script except ImportError: print('Unexpected error: import of G2script failed!') return return newfil
# see if a directory for local modifications is defined. If so, stick that in the path if os.path.exists(os.path.expanduser('~/.G2local/')): sys.path.insert(0,os.path.expanduser('~/.G2local/')) fl = glob.glob(os.path.expanduser('~/.G2local/GSASII*.py*')) files = "" prev = None for f in sorted(fl): # make a list of files, dropping .pyc files where a .py exists f = os.path.split(f)[1] if os.path.splitext(f)[0] == prev: continue prev = os.path.splitext(f)[0] if files: files += ", " files += f if files: print("*"*75) print("Warning: the following source files are locally overridden in "+os.path.expanduser('~/.G2local/')) print(" "+files) print("*"*75) BinaryPathFailed = False BinaryPathLoaded = False binaryPath = '' IPyBreak = DoNothing pdbBreak = DoNothing if __name__ == '__main__': '''What follows is called to update (or downdate) GSAS-II in a separate process. ''' # check what type of update is being called for gitUpdate = False preupdateType = None updateType = None regressversion = None help = False project = None version = None for arg in sys.argv[1:]: if '--git-fetch' in arg: # pulls latest updates from server but does not apply them if preupdateType or updateType: print(f'previous option conflicts with {arg}') help = True break updateType = 'fetch' elif '--git-reset' in arg: # restores locally changed GSAS-II files to distributed versions also updates gitUpdate = True if preupdateType: print(f'previous option conflicts with {arg}') help = True break preupdateType = 'reset' elif '--git-stash' in arg: # saves locally changed GSAS-II files in "stash" argsplit = arg.split('=') if len(argsplit) == 1: message = None elif len(argsplit) == 2: message=argsplit[1] if message.startswith('"') and message.endswith('"'): message = message.strip('"') if message.startswith("'") and message.endswith("'"): message = message.strip("'") message = message.replace('"',"'") # double quote not allowed else: print('invalid form for --git-stash') help = True break gitUpdate = True if preupdateType: print(f'previous option conflicts with {arg}') help = True break preupdateType = 'stash' elif '--git-update' in arg: # update to latest downloaded version gitUpdate = True if updateType: print(f'previous option conflicts with {arg}') help = True break updateType = 'update' elif '--git-regress' in arg: argsplit = arg.split('=') if len(argsplit) != 2: print('invalid form for --git-regress') help = True break gitversion = argsplit[1] # make sure the version or tag supplied is valid and convert to # a full sha hash g2repo = openGitRepo(path2GSAS2) try: regressversion = g2repo.commit(gitversion).hexsha except git.BadName: print(f'invalid version specified ({version}) for GitHub regression') help = True break if updateType: print(f'previous option conflicts with {arg}') help = True break updateType = 'regress' gitUpdate = True elif '--help' in arg: help = True break elif os.path.exists(arg): # svn args parsed later; this is just checking project = arg pass else: # for old-style svn update if arg.isdecimal() or not arg: #version = arg pass else: print(f'unknown arg {arg}') help = True if gitUpdate and version: print('Conflicting arguments (git & svn opts combined?)') help = True if help or len(sys.argv) == 1: print('''Options when running GSASIIpath.py standalone to update/regress repository from svn repository: python GSASIIpath.py <project> <version> where <project> is an optional path reference to a .gpx file and <version> is a specific GSAS-II version to install (default is latest) to update/regress repository from git repository: python GSASIIpath.py option <project> where option will be one or more of the following: --git-fetch downloads lastest changes from repo any other options will be ignored --git-stash="message" saves local changes --git-reset discards local changes --git-update --git-regress=version and where <project> is an optional path reference to a .gpx file Note: --git-reset and --git-stash cannot be used together. Likewise --git-update and --git-regress cannot be used together. However either --git-reset or --git-stash can be used with either --git-update or --git-regress. --git-fetch cannot be used with any other options. ''') sys.exit() if updateType == 'fetch': # download the latest updates from GitHub to the local repository # in background while GSAS-II runs no updates are applied logfile = os.path.join(os.path.expanduser('~'),'GSASII_bkgUpdate.log') mode = 'a' # don't let log file get too large (20K bytes) if os.path.exists(logfile) and os.path.getsize(logfile) > 20000: mode = 'w' # if file open fails, there is probably a concurent update process try: fp = open(logfile,mode) except: print('background git update was unable to open log file') sys.exit() fp.write('Starting background git update') fp.write(dt.datetime.strftime(dt.datetime.now(), " at %Y-%m-%dT%H:%M\n")) try: import git except: fp.write('git import failed') fp.close() sys.exit() try: g2repo = openGitRepo(path2GSAS2) g2repo.remote().fetch() fp.write(f'Updates fetched\n') except Exception as msg: fp.write(f'Update failed with message {msg}\n') if g2repo.head.is_detached: fp.write(f'Status: reverted to an old install\n') else: try: rc,lc,_ = gitCheckForUpdates(False,g2repo) if len(rc) == 0: fp.write('Status: no unapplied commits\n') else: fp.write(f'Status: unapplied commits now {len(rc)}\n') except Exception as msg: fp.write(f'\ngitCheckForUpdates failed with message {msg}\n') fp.write('update done at') fp.write(dt.datetime.strftime(dt.datetime.now(), " at %Y-%m-%dT%H:%M\n\n")) fp.close() sys.exit() if gitUpdate: import time time.sleep(1) # delay to give the main process a chance to exit # so we don't change code for a running process # windows does not like that try: import git except: print('git import failed') sys.exit() try: g2repo = openGitRepo(path2GSAS2) except Exception as msg: print(f'Update failed with message {msg}\n') sys.exit() print('git repo opened') if preupdateType == 'reset': # --git-reset (preupdateType = 'reset') print('Restoring locally-updated GSAS-II files to original status') openGitRepo(path2GSAS2).git.reset('--hard','origin/master') try: if g2repo.active_branch.name != 'master': g2repo.git.switch('master') except TypeError: # fails on detached head pass elif preupdateType == 'stash': # --git-stash (preupdateType = 'stash') print('Stashing locally-updated GSAS-II files') if message: g2repo.git.stash(f'-m"{message}"') else: g2repo.git.stash() # Update to the latest GSAS-II version. This assumes that a fetch has # been done prior, or this will only update to the last time that # it was done. if updateType == 'update': # --git-update (updateType = 'update') if g2repo.is_dirty(): print('Cannot update a directory with locally-made changes') sys.exit() print('Updating to latest GSAS-II version') if g2repo.head.is_detached: g2repo.git.switch('master') g2repo.git.merge('--ff-only') print('git: updated to latest version') # Update or regress to a specific GSAS-II version. # this will always cause a "detached head" status elif updateType == 'regress': # --git-regress (updateType = 'regress') if g2repo.is_dirty(): print('Cannot regress a directory with locally-made changes') sys.exit() print(f'Regressing to git version {regressversion[:6]}') g2repo.git.checkout(regressversion) if gitUpdate: # now restart GSAS-II with the new version # G2scrpt = os.path.join(path2GSAS2,'GSASII.py') if project: print(f"Restart GSAS-II with project file {project!r}") # subprocess.Popen([sys.executable,G2scrpt,project]) else: print("Restart GSAS-II without a project file ") # subprocess.Popen([sys.executable,G2scrpt]) import GSASIIctrlGUI GSASIIctrlGUI.openInNewTerm(project) print ('exiting update process') sys.exit() else: # this is the old svn update process LoadConfig() import time time.sleep(1) # delay to give the main process a chance to exit # perform an update and restart GSAS-II try: project,version = sys.argv[1:3] except ValueError: project = None version = 'trunk' loc = os.path.dirname(__file__) if version == 'trunk': svnSwitch2branch('') elif '/' in version: svnSwitch2branch(version) elif version: print("Regress to version "+str(version)) svnUpdateDir(loc,version=version) else: print("Update to current version") svnUpdateDir(loc) ex = sys.executable if sys.platform == "darwin": # mac requires pythonw which is not always reported as sys.executable if os.path.exists(ex+'w'): ex += 'w' if project: print("Restart GSAS-II with project file "+str(project)) subprocess.Popen([ex,os.path.join(loc,'GSASII.py'),project]) else: print("Restart GSAS-II without a project file ") subprocess.Popen([ex,os.path.join(loc,'GSASII.py')]) print ('exiting update process') sys.exit()