Fuzion Logo
fuzion-lang.dev — The Fuzion Language Portal
JavaScript seems to be disabled. Functionality is limited.

concur/thread_pool.fz


# This file is part of the Fuzion language implementation.
#
# The Fuzion language implementation is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, version 3 of the License.
#
# The Fuzion language implementation is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License along with The
# Fuzion language implementation.  If not, see <https://www.gnu.org/licenses/>.


# -----------------------------------------------------------------------
#
#  Tokiwa Software GmbH, Germany
#
#  Source code of Fuzion standard library feature concur.thread_pool
#
# -----------------------------------------------------------------------



# simple, fixed size thread pool
#
# use `concur.thread_pool.env.submit` to
# submit a task to the thread_pool.
#
public thread_pool(R type, size i32, code ()->R) outcome R =>
  (thread_pool size) ! code


# concrete implementation of concur.Future
# meant to run on a thread thread_pool
#
Computable_Future(
  # result type of the computation
  T type,
  # the thread pool to use for subsequent computation
  TP type : concur.thread_pool,
  # the code of the future
  task ()->T) ref : concur.Future T is

  cnd := concur.sync.condition.new.val # NYI: error handling

  compute_result := concur.atomic (option T) .new nil

  compute =>
    cr := task.call
    cnd.synchronized ()->
      compute_result.write cr
      check cnd.broadcast

  public redef is_done bool  ! atomic_access => compute_result.read.exists

  public redef get T ! atomic_access =>
    cnd.synchronized ()->
      if !is_done
        check cnd.wait

    compute_result.read.val

  public redef and_then(T2 type, new_task T->T2) concur.Future T2 ! atomic_access, TP =>
    TP.env.submit T2 ()->(new_task get)



# simple, fixed size thread pool
#
private:public thread_pool(size i32) : effect
pre debug: size > 0
is

  bq := Blocking_Queue ()->unit

  thrds := array size _->
      concur.threads.spawn ()->
        for x := bq.dequeue
        while x.exists
          x.val.call


  # submit a task to the thread pool
  #
  # returns false if the pool is already shutting down
  #
  public submit(task ()->unit) bool ! atomic_access =>
    bq.enqueue task


  # submit a task to the thread pool
  #
  # returns a future if successful or panics in case
  # the thread pool has already shut down.
  #
  public submit(T type, task ()->T) concur.Future T ! atomic_access =>
    res := Computable_Future _ thread_pool.this task
    if bq.enqueue ()->res.compute
      res
    else
      panic "Thread pool has already shut down."


  # stop accepting jobs,
  # notify all threads to wake up,
  # wait for all threads to finish
  #
  public redef finally unit ! atomic_access =>

    # initiate shutdown of thread pool
    # i.e. stop accepting jobs
    #
    bq.close

    # wait for all threads to finish
    thrds.for_each t->
      t.join

last changed: 2026-03-05