# Copyright (c) 2013 Paul Tagliamonte # Copyright (c) 2013, 2014 Bob Tolbert # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the "Software"), # to deal in the Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, distribute, sublicense, # and/or sell copies of the Software, and to permit persons to whom the # Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. from hy.compiler import hy_compile, HyTypeError from hy.models import HyObject, replace_hy_obj from hy.lex import tokenize, LexException from hy.errors import HyIOError from io import open import re import marshal import struct import imp import sys import ast import os import __future__ from hy._compat import PY3, PY34, MAGIC, builtins, long_type, wr_long from hy._compat import string_types def ast_compile(ast, filename, mode): """Compile AST. Like Python's compile, but with some special flags.""" flags = (__future__.CO_FUTURE_DIVISION | __future__.CO_FUTURE_PRINT_FUNCTION) return compile(ast, filename, mode, flags) def import_buffer_to_hst(buf): """Import content from buf and return a Hy AST.""" return tokenize(buf + "\n") def import_file_to_hst(fpath): """Import content from fpath and return a Hy AST.""" try: with open(fpath, 'r', encoding='utf-8') as f: return import_buffer_to_hst(f.read()) except IOError as e: raise HyIOError(e.errno, e.strerror, e.filename) def import_buffer_to_ast(buf, module_name): """ Import content from buf and return a Python AST.""" return hy_compile(import_buffer_to_hst(buf), module_name) def import_file_to_ast(fpath, module_name): """Import content from fpath and return a Python AST.""" return hy_compile(import_file_to_hst(fpath), module_name) def import_file_to_module(module_name, fpath, loader=None): """Import Hy source from fpath and put it into a Python module. If there's an up-to-date byte-compiled version of this module, load that instead. Otherwise, byte-compile the module once we're done loading it, if we can. Return the module.""" module = None bytecode_path = get_bytecode_path(fpath) try: source_mtime = int(os.stat(fpath).st_mtime) with open(bytecode_path, 'rb') as bc_f: # The first 4 bytes are the magic number for the version of Python # that compiled this bytecode. bytecode_magic = bc_f.read(4) # The next 4 bytes, interpreted as a little-endian 32-bit integer, # are the mtime of the corresponding source file. bytecode_mtime, = struct.unpack('= source_mtime: # It's a cache hit. Load the byte-compiled version. if PY3: # As of Python 3.6, imp.load_compiled still exists, but it's # deprecated. So let's use SourcelessFileLoader instead. from importlib.machinery import SourcelessFileLoader module = (SourcelessFileLoader(module_name, bytecode_path). load_module(module_name)) else: module = imp.load_compiled(module_name, bytecode_path) if not module: # It's a cache miss, so load from source. sys.modules[module_name] = None try: _ast = import_file_to_ast(fpath, module_name) module = imp.new_module(module_name) module.__file__ = fpath code = ast_compile(_ast, fpath, "exec") try: write_code_as_pyc(fpath, code) except (IOError, OSError): # We failed to save the bytecode, probably because of a # permissions issue. The user only asked to import the # file, so don't bug them about it. pass eval(code, module.__dict__) except (HyTypeError, LexException) as e: if e.source is None: with open(fpath, 'rt') as fp: e.source = fp.read() e.filename = fpath raise except Exception: sys.modules.pop(module_name, None) raise sys.modules[module_name] = module module.__name__ = module_name module.__file__ = fpath if loader: module.__loader__ = loader if is_package(module_name): module.__path__ = [] module.__package__ = module_name else: module.__package__ = module_name.rpartition('.')[0] return module def import_buffer_to_module(module_name, buf): try: _ast = import_buffer_to_ast(buf, module_name) mod = imp.new_module(module_name) eval(ast_compile(_ast, "", "exec"), mod.__dict__) except (HyTypeError, LexException) as e: if e.source is None: e.source = buf e.filename = '' raise return mod def hy_eval(hytree, namespace, module_name, ast_callback=None): foo = HyObject() foo.start_line = 0 foo.end_line = 0 foo.start_column = 0 foo.end_column = 0 replace_hy_obj(hytree, foo) if not isinstance(module_name, string_types): raise HyTypeError(foo, "Module name must be a string") _ast, expr = hy_compile(hytree, module_name, get_expr=True) # Spoof the positions in the generated ast... for node in ast.walk(_ast): node.lineno = 1 node.col_offset = 1 for node in ast.walk(expr): node.lineno = 1 node.col_offset = 1 if ast_callback: ast_callback(_ast, expr) if not isinstance(namespace, dict): raise HyTypeError(foo, "Globals must be a dictionary") # Two-step eval: eval() the body of the exec call eval(ast_compile(_ast, "", "exec"), namespace) # Then eval the expression context and return that return eval(ast_compile(expr, "", "eval"), namespace) def write_hy_as_pyc(fname): _ast = import_file_to_ast(fname, os.path.basename(os.path.splitext(fname)[0])) code = ast_compile(_ast, fname, "exec") write_code_as_pyc(fname, code) def write_code_as_pyc(fname, code): st = os.stat(fname) timestamp = long_type(st.st_mtime) cfile = get_bytecode_path(fname) try: os.makedirs(os.path.dirname(cfile)) except (IOError, OSError): pass with builtins.open(cfile, 'wb') as fc: fc.write(MAGIC) wr_long(fc, timestamp) if PY3: wr_long(fc, st.st_size) marshal.dump(code, fc) class MetaLoader(object): def __init__(self, path): self.path = path def load_module(self, fullname): if fullname in sys.modules: return sys.modules[fullname] if not self.path: return return import_file_to_module(fullname, self.path, self) class MetaImporter(object): def find_on_path(self, fullname): fls = ["%s/__init__.hy", "%s.hy"] dirpath = "/".join(fullname.split(".")) for pth in sys.path: pth = os.path.abspath(pth) for fp in fls: composed_path = fp % ("%s/%s" % (pth, dirpath)) if os.path.exists(composed_path): return composed_path def find_module(self, fullname, path=None): path = self.find_on_path(fullname) if path: return MetaLoader(path) sys.meta_path.insert(0, MetaImporter()) sys.path.insert(0, "") def is_package(module_name): mpath = os.path.join(*module_name.split(".")) for path in map(os.path.abspath, sys.path): if os.path.exists(os.path.join(path, mpath, "__init__.hy")): return True return False def get_bytecode_path(source_path): if PY34: import importlib.util return importlib.util.cache_from_source(source_path) elif hasattr(imp, "cache_from_source"): return imp.cache_from_source(source_path) else: # If source_path has a file extension, replace it with ".pyc". # Otherwise, just append ".pyc". d, f = os.path.split(source_path) return os.path.join(d, re.sub(r"(?:\.[^.]+)?\Z", ".pyc", f))