123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- import requests
- import validators
- from pathlib import Path
- from os import path, walk
- from urllib.parse import urlsplit, urlunsplit
- from tqdm import tqdm
- MODELS_MANIFEST = 'https://ollama.ai/api/models'
- MODELS_CACHE_PATH = Path.home() / '.ollama' / 'models'
- def models(*args, **kwargs):
- for _, _, files in walk(MODELS_CACHE_PATH):
- for file in files:
- base, ext = path.splitext(file)
- if ext == '.bin':
- yield base
- # get the url of the model from our curated directory
- def get_url_from_directory(model):
- response = requests.get(MODELS_MANIFEST)
- response.raise_for_status()
- directory = response.json()
- for model_info in directory:
- if model_info.get('name') == model:
- return model_info.get('url')
- return model
- def download_from_repo(url, file_name):
- parts = urlsplit(url)
- path_parts = parts.path.split('/tree/')
- if len(path_parts) == 1:
- location = path_parts[0]
- branch = 'main'
- else:
- location, branch = path_parts
- location = location.strip('/')
- if file_name == '':
- file_name = path.basename(location).lower()
- download_url = urlunsplit(
- (
- 'https',
- parts.netloc,
- f'/api/models/{location}/tree/{branch}',
- parts.query,
- parts.fragment,
- )
- )
- response = requests.get(download_url)
- response.raise_for_status()
- json_response = response.json()
- download_url, file_size = find_bin_file(json_response, location, branch)
- return download_file(download_url, file_name, file_size)
- def find_bin_file(json_response, location, branch):
- download_url = None
- file_size = 0
- for file_info in json_response:
- if file_info.get('type') == 'file' and file_info.get('path').endswith('.bin'):
- f_path = file_info.get('path')
- download_url = (
- f'https://huggingface.co/{location}/resolve/{branch}/{f_path}'
- )
- file_size = file_info.get('size')
- if download_url is None:
- raise Exception('No model found')
- return download_url, file_size
- def download_file(download_url, file_name, file_size):
- local_filename = MODELS_CACHE_PATH / file_name + '.bin'
- first_byte = path.getsize(local_filename) if path.exists(local_filename) else 0
- if first_byte >= file_size:
- return local_filename
- print(f'Pulling {file_name}...')
- header = {'Range': f'bytes={first_byte}-'} if first_byte != 0 else {}
- response = requests.get(download_url, headers=header, stream=True)
- response.raise_for_status()
- total_size = int(response.headers.get('content-length', 0)) + first_byte
- with open(local_filename, 'ab' if first_byte else 'wb') as file, tqdm(
- total=total_size,
- unit='iB',
- unit_scale=True,
- unit_divisor=1024,
- initial=first_byte,
- ascii=' ==',
- bar_format='Downloading [{bar}] {percentage:3.2f}% {rate_fmt}{postfix}',
- ) as bar:
- for data in response.iter_content(chunk_size=1024):
- size = file.write(data)
- bar.update(size)
- return local_filename
- def pull(model_name, *args, **kwargs):
- if path.exists(model_name):
- # a file on the filesystem is being specified
- return model_name
- # check the remote model location and see if it needs to be downloaded
- url = model_name
- file_name = ""
- if not validators.url(url) and not url.startswith('huggingface.co'):
- url = get_url_from_directory(model_name)
- file_name = model_name
- if not (url.startswith('http://') or url.startswith('https://')):
- url = f'https://{url}'
- if not validators.url(url):
- if model_name in models(MODELS_CACHE_PATH):
- # the model is already downloaded, and specified by name
- return model_name
- raise Exception(f'Unknown model {model_name}')
- local_filename = download_from_repo(url, file_name)
- return local_filename
|