|
@@ -26,13 +26,13 @@
|
|
|
|
|
|
// Expose the llama server as a callable extern "C" API
|
|
|
llama_server_context *llama = NULL;
|
|
|
-std::atomic<bool> ext_server_running(false);
|
|
|
std::thread ext_server_thread;
|
|
|
|
|
|
void llama_server_init(ext_server_params *sparams, ext_server_resp_t *err) {
|
|
|
assert(err != NULL && sparams != NULL);
|
|
|
log_set_target(stderr);
|
|
|
if (!sparams->verbose_logging) {
|
|
|
+ server_verbose = true;
|
|
|
log_disable();
|
|
|
}
|
|
|
|
|
@@ -122,18 +122,23 @@ void llama_server_start() {
|
|
|
assert(llama != NULL);
|
|
|
// TODO mutex to protect thread creation
|
|
|
ext_server_thread = std::thread([&]() {
|
|
|
- ext_server_running = true;
|
|
|
try {
|
|
|
LOG_TEE("llama server main loop starting\n");
|
|
|
ggml_time_init();
|
|
|
- while (ext_server_running.load()) {
|
|
|
- if (!llama->update_slots()) {
|
|
|
- LOG_TEE(
|
|
|
- "unexpected error in llama server update_slots - exiting main "
|
|
|
- "loop\n");
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
+ llama->queue_tasks.on_new_task(std::bind(
|
|
|
+ &llama_server_context::process_single_task, llama, std::placeholders::_1));
|
|
|
+ llama->queue_tasks.on_finish_multitask(std::bind(
|
|
|
+ &llama_server_context::on_finish_multitask, llama, std::placeholders::_1));
|
|
|
+ llama->queue_tasks.on_all_tasks_finished(std::bind(
|
|
|
+ &llama_server_context::run_on_all_tasks_finished, llama));
|
|
|
+ llama->queue_results.on_multitask_update(std::bind(
|
|
|
+ &llama_server_queue::update_multitask,
|
|
|
+ &llama->queue_tasks,
|
|
|
+ std::placeholders::_1,
|
|
|
+ std::placeholders::_2,
|
|
|
+ std::placeholders::_3
|
|
|
+ ));
|
|
|
+ llama->queue_tasks.start_loop();
|
|
|
} catch (std::exception &e) {
|
|
|
LOG_TEE("caught exception in llama server main loop: %s\n", e.what());
|
|
|
} catch (...) {
|
|
@@ -146,13 +151,10 @@ void llama_server_start() {
|
|
|
|
|
|
void llama_server_stop() {
|
|
|
assert(llama != NULL);
|
|
|
- // TODO - too verbose, remove once things are solid
|
|
|
- LOG_TEE("requesting llama server shutdown\n");
|
|
|
- ext_server_running = false;
|
|
|
-
|
|
|
- // unblocks the update_slots() loop so it can clean up and exit
|
|
|
- llama->request_cancel(0);
|
|
|
-
|
|
|
+ LOG_TEE("\ninitiating shutdown - draining remaining tasks...\n");
|
|
|
+ // This may take a while for any pending tasks to drain
|
|
|
+ // TODO - consider a timeout to cancel tasks if it's taking too long
|
|
|
+ llama->queue_tasks.terminate();
|
|
|
ext_server_thread.join();
|
|
|
delete llama;
|
|
|
llama = NULL;
|
|
@@ -165,7 +167,9 @@ void llama_server_completion(const char *json_req, ext_server_resp_t *resp) {
|
|
|
resp->msg[0] = '\0';
|
|
|
try {
|
|
|
json data = json::parse(json_req);
|
|
|
- resp->id = llama->request_completion(data, false, false, -1);
|
|
|
+ resp->id = llama->queue_tasks.get_new_id();
|
|
|
+ llama->queue_results.add_waiting_task_id(resp->id);
|
|
|
+ llama->request_completion(resp->id, data, false, false, -1);
|
|
|
} catch (std::exception &e) {
|
|
|
snprintf(resp->msg, resp->msg_len, "exception %s", e.what());
|
|
|
} catch (...) {
|
|
@@ -183,16 +187,22 @@ void llama_server_completion_next_result(const int task_id,
|
|
|
resp->json_resp = NULL;
|
|
|
std::string result_json;
|
|
|
try {
|
|
|
- task_result result = llama->next_result(task_id);
|
|
|
+ task_result result = llama->queue_results.recv(task_id);
|
|
|
result_json =
|
|
|
result.result_json.dump(-1, ' ', false, json::error_handler_t::replace);
|
|
|
resp->id = result.id;
|
|
|
resp->stop = result.stop;
|
|
|
resp->error = result.error;
|
|
|
if (result.error) {
|
|
|
+ LOG_TEE("next result cancel on error\n");
|
|
|
llama->request_cancel(task_id);
|
|
|
+ LOG_TEE("next result removing waiting tak ID: %d\n", task_id);
|
|
|
+ llama->queue_results.remove_waiting_task_id(task_id);
|
|
|
} else if (result.stop) {
|
|
|
+ LOG_TEE("next result cancel on stop\n");
|
|
|
llama->request_cancel(task_id);
|
|
|
+ LOG_TEE("next result removing waiting task ID: %d\n", task_id);
|
|
|
+ llama->queue_results.remove_waiting_task_id(task_id);
|
|
|
}
|
|
|
} catch (std::exception &e) {
|
|
|
resp->error = true;
|
|
@@ -223,6 +233,7 @@ void llama_server_completion_cancel(const int task_id, ext_server_resp_t *err) {
|
|
|
err->msg[0] = '\0';
|
|
|
try {
|
|
|
llama->request_cancel(task_id);
|
|
|
+ llama->queue_results.remove_waiting_task_id(task_id);
|
|
|
} catch (std::exception &e) {
|
|
|
err->id = -1;
|
|
|
snprintf(err->msg, err->msg_len, "exception %s", e.what());
|
|
@@ -307,13 +318,15 @@ void llama_server_embedding(const char *json_req, char **json_resp,
|
|
|
} else {
|
|
|
prompt = "";
|
|
|
}
|
|
|
- const int task_id = llama->request_completion(
|
|
|
- {{"prompt", prompt}, {"n_predict", 0}}, false, true, -1);
|
|
|
- task_result result = llama->next_result(task_id);
|
|
|
+ const int task_id = llama->queue_tasks.get_new_id();
|
|
|
+ llama->queue_results.add_waiting_task_id(task_id);
|
|
|
+ llama->request_completion(task_id, {{"prompt", prompt}, {"n_predict", 0}}, false, true, -1);
|
|
|
+ task_result result = llama->queue_results.recv(task_id);
|
|
|
std::string result_json = result.result_json.dump();
|
|
|
const std::string::size_type size = result_json.size() + 1;
|
|
|
*json_resp = new char[size];
|
|
|
snprintf(*json_resp, size, "%s", result_json.c_str());
|
|
|
+ llama->queue_results.remove_waiting_task_id(task_id);
|
|
|
} catch (std::exception &e) {
|
|
|
err->id = -1;
|
|
|
snprintf(err->msg, err->msg_len, "exception %s", e.what());
|