Fuzion Logo
fuzion-lang.dev — The Fuzion Language Portal
JavaScript seems to be disabled. Functionality is limited.

concur/Blocking_Queue.fz


# This file is part of the Fuzion language implementation.
#
# The Fuzion language implementation is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, version 3 of the License.
#
# The Fuzion language implementation is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License along with The
# Fuzion language implementation.  If not, see <https://www.gnu.org/licenses/>.


# -----------------------------------------------------------------------
#
#  Tokiwa Software GmbH, Germany
#
#  Source code of Fuzion standard library feature Blocking_Queue
#
# -----------------------------------------------------------------------


# NYI: replace by better queue implementation, e.g. https://arxiv.org/pdf/2305.07229
module Blocking_Queue(T type, cnd concur.sync.condition) ref is

  # is closed for taking any new elements?
  #
  module is_closed := concur.atomic false

  data := concur.atomic (array T) []


  # enqueue an element
  # if queue is already closed, returns false
  # otherwise enqueues the element and returns true
  #
  module enqueue(el T) bool =>
    if is_closed.read
      false
    else
      for old := data.read
          new := old.put old.length el
      while !data.compare_and_set old new
      else
        cnd.synchronized ()->
          # signal a waiting thread to pick up the work
          check cnd.signal
        true


  # dequeue an element
  # returns nil, if queue is closed and empty
  # otherwise returns the dequeued element
  #
  module dequeue option T =>
    o := data.read
    if o.is_empty
      if is_closed.read
        nil
      else
        cnd.synchronized ()->
          # check the two conditions again, otherwise we MUST NOT wait
          if !is_closed.read && data.read.is_empty
            # NYI: BUG: possible dead lock? what if we write to data in the meantime?
            # we should use TLA+ or similar to prove correctness
            # https://www.hillelwayne.com/post/list-of-tla-examples/
            check cnd.wait
        dequeue
    else
      res := o[0]
      n := array o.count-1 i->o[i+1]
      if data.compare_and_set o n
        res
      else
        dequeue
last changed: 2025-06-17