Fuzion Logo
fuzion-lang.dev — The Fuzion Language Portal
JavaScript seems to be disabled. Functionality is limited.

lock_free/Map.fz


# This file is part of the Fuzion language implementation.
#
# The Fuzion language implementation is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, version 3 of the License.
#
# The Fuzion language implementation is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License along with The
# Fuzion language implementation.  If not, see <https://www.gnu.org/licenses/>.


# -----------------------------------------------------------------------
#
#  Tokiwa Software GmbH, Germany
#
#  Source code of Fuzion feature lock_free.Map
#
#  Author: Michael Lill (michael.lill@tokiwa.software)
#
# -----------------------------------------------------------------------

# A Fuzion implementation of CTrie invented by Aleksandar Prokopec
# CTrie is a non-blocking concurrent hash trie
#
# reference paper: Concurrent Tries with Efficient Non-Blocking Snapshots
# https://aleksandar-prokopec.com/resources/docs/ctries-snapshot.pdf
#
# reference implementation in Scala: https://github.com/axel22/Ctries/
# on wikipedia: https://en.wikipedia.org/wiki/Ctrie
#
# Complexity (according to the paper):
# add, lookup, remove                     : O(log n)
# snapshot, amortized size retrieval, clear  : O(1)
#
# While CTrie is lock-free and guaranteed to make progress as a whole
# it is not wait-free and thus no such guarantee is given per thread.
#
# NYI addif
#
# glossary:
# CTK => ctrie key
# CTV => ctrie value
# k   => key
# v   => value
# gen => generation
# lev => level
# bmp => bitmap
# idx => index
# W   => 2^W-way branching factor
#


# a tomb node
# "a T-node is the last value assigned to an I-node"
tomb_node(CTK type : property.hashable, CTV type, sn singleton_node CTK CTV) is
  redef as_string => "tomb_node($sn)"

  as_list => sn.as_list



# a singleton node
# the node type containing actual data
singleton_node(CTK type : property.hashable, CTV type, k CTK, v CTV) is
  redef as_string => "singleton_node($k, $v)"

  as_list => [(k,v)].as_list



# an indirection or a singleton node
# these are put into a container node
branch(CTK type : property.hashable, CTV type) : choice (Indirection_Node CTK CTV) (singleton_node CTK CTV) is
  redef as_string =>
    match branch.this
      indirection_node Indirection_Node => "$indirection_node"
      singleton_node singleton_node => "$singleton_node"

  as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV) =>
    match branch.this
      indirection_node Indirection_Node => indirection_node.as_list f
      singleton_node singleton_node => singleton_node.as_list



# a container node
# consists of a bitmap of filled spaces and an array of child nodes
container_node(CTK type : property.hashable, CTV type, bmp u32, array array (branch CTK CTV), gen u64)
is

  # update a child node and return a new container_node
  update(pos u32, node branch CTK CTV, g u64) =>
    container_node bmp (array.put pos.as_i32 node) g

  # add a child node and return a new container_node
  put(sn singleton_node CTK CTV, pos, flag u32, g u64) =>
    container_node (bmp | flag) (array.insert pos.as_i32 sn).as_array g

  # remove a child node and return a new container_node
  remove(pos, flag u32, g u64)
  pre debug: pos < array.length.as_u32
  post array.length > result.array.length
  =>
    # NYI delete/remove should be in stdlib
    tmp := array.take pos.as_i32 ++ array.drop (pos + 1).as_i32
    container_node (bmp ^ flag) tmp.as_array g

  # get item at pos in this container node
  index [] (pos i32) branch CTK CTV =>
    array[pos]

  redef as_string => "container_node[gen=$gen]({array.as_string ", "})"

  as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV) =>
    array.flat_map (tuple CTK CTV) (b -> b.as_list f)
         .as_list



# a prev_node is used in the generation aware compare and swap
#
# Main_Node.prev is initially nil
# when a Main_Node is replaced it is put into
# the successors `prev`-field.
#
prev_node(CTK type : property.hashable, CTV type) : choice (failed_node CTK CTV) (Main_Node CTK CTV) nil is

  redef as_string =>
    match prev_node.this
      f lock_free.failed_node => f.as_string
      m lock_free.Main_Node => m.as_string
      nil => nil.as_string

  is_nil =>
    match prev_node.this
      nil => true
      * => false


# a container, tomb or linked list node, with a previous field
# to support the generational compare and swap
#
Main_Node(CTK type : property.hashable, CTV type, data choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV), p prev_node CTK CTV) ref : property.equatable is

  id := unique_id

  # a previous node that gets set during a generational aware compare and set
  prev := concur.atomic (prev_node CTK CTV) p

  fixed redef type.equality(a, b Main_Node.this) bool =>
    a.id = b.id

  # compare and update `prev`
  cas_prev(o,n prev_node CTK CTV) =>
    prev.compare_and_set o n

  redef as_string =>
    s := match data
      container_node container_node => "$container_node"
      tomb_node tomb_node => "$tomb_node"
      list_node list_node => "$list_node"
    "Main_Node[prev={prev.read}]($s)"

  as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV)
  pre match prev.read
    nil => true
    * => false
  =>
    match data
      c container_node => c.as_list f
      t tomb_node      => t.as_list
      l list_node      => l.as_list


# a failed node where the previous indirection node contains a main node
failed_node(CTK type : property.hashable, CTV type, prev Main_Node CTK CTV) is

  redef as_string =>
    "failed_node($prev)"


# shorthand for creating a new indirection node from Main_Node and gen
indirection_node(CTK type : property.hashable, CTV type, data Main_Node CTK CTV, gen u64) =>
  Indirection_Node (concur.atomic data) gen


# an indirection node
Indirection_Node(CTK type : property.hashable, CTV type, data concur.atomic (Main_Node CTK CTV), gen u64) ref : property.equatable is

  id := unique_id

  fixed redef type.equality(a, b Indirection_Node.this) =>
    a.id = b.id

  # compare and update
  cas(old_n, new_n Main_Node CTK CTV) =>
    data.compare_and_set old_n new_n

  redef as_string => "Indirection_Node[gen=$gen]({data.read})"

  as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV) =>
    (f Indirection_Node.this).as_list f


# a linked list node
# NYI instead of Sequence we should use something like the original implementation ListMap(Scala).
list_node(CTK type : property.hashable, CTV type, from Sequence (singleton_node CTK CTV)) : Sequence (tuple CTK CTV)
pre from ∀ (sn -> (from.filter (snn -> (hash sn.k) = (hash snn.k))).count = 1)
is
  redef as_list => from
    .map sn->(sn.k, sn.v)
    .as_list

  # is this sequence known to be finite?  For infinite sequences, features like
  # count diverge.
  #
  redef finite => trit.yes

  redef as_string => "list_node({from.as_string ", "})"

  # find k in linked nodes
  find_key(k CTK) choice restart not_found CTV =>
    match from.drop_while(sn -> sn.k != k).first
      nil => not_found
      sn singleton_node => sn.v


# unit type to indicate an operation did not succeed yet
# and thus needs a restart
restart is

# unit type to indicate success
ctrie_ok is

# unit type to indicate when value to lookup/remove is not found
not_found is
  public redef as_string => "not found"


# descriptor for double-compare-single-swap operation
Rdcss_Descriptor(CTK type : property.hashable, CTV type, ov Indirection_Node CTK CTV, exp Main_Node CTK CTV, nv Indirection_Node CTK CTV) ref
pre exp.prev.read.is_nil
is

  # this field does not have to be atomic
  # but it probably does not hurt either.
  committed := concur.atomic false

  redef as_string =>
    "Rdcss_Descriptor($ov, $exp, $nv)"


# the root node of the ctrie, normally an indirection node.
# in case the root node is currently replaced it is a Rdcss_Descriptor temporarily.
root_node(CTK type : property.hashable, CTV type) : choice (Indirection_Node CTK CTV) (Rdcss_Descriptor CTK CTV) is

  redef as_string =>
    s => match root_node.this
      i lock_free.Indirection_Node => i.as_string
      r lock_free.Rdcss_Descriptor => r.as_string
    "root_node($s)"


# the ctrie
private:public Map(public CTK type : property.hashable,
                   public CTV type,
                   root concur.atomic (root_node CTK CTV),
                   public read_only bool) ref : container.Mutable_Map CTK CTV
is

  # the data structure as human readable string
  # for debugging purposes
  as_string_internal =>
    "Ctrie[ro=$read_only]({root.read})"

  # compare and swap root of the ctrie
  cas_root(ov, nv root_node CTK CTV) =>
    root.compare_and_set ov nv


  # copy an indirection node to a new generation
  # this is used when
  # 1) taking a snapshot and the root node needs to be copied to a new generation
  # 2) copying a container node to a new generation
  #
  copy_to_gen(i Indirection_Node CTK CTV, new_gen u64)
  post result.data.read.prev.read.is_nil
  =>
    indirection_node (gcas_read i) new_gen


  # copy this container_node to new generation
  renew(cn container_node CTK CTV, new_gen u64) =>
    copy => cn
      .array
      .map (branch CTK CTV) x->
        match x
          i Indirection_Node =>
            copy_to_gen i new_gen
          sn singleton_node => sn
      .as_array
    container_node cn.bmp copy new_gen


  # complete the double compare and swap
  # of the root node
  rdcss_complete(abortable bool) Indirection_Node CTK CTV
  =>
    match root.read
      # there is nothing to do
      n Indirection_Node => n

      desc Rdcss_Descriptor =>
        if abortable
          if cas_root desc desc.ov
            desc.ov
          else
            rdcss_complete abortable
        else
          old_main := gcas_read desc.ov
          if old_main = desc.exp
            if cas_root desc desc.nv
              desc.committed.write true
              desc.nv
            else
              rdcss_complete abortable
          else
            if cas_root desc desc.ov
              desc.ov
            else
              rdcss_complete abortable


  # read root
  # if root is currently a descriptor we are in the middle
  # of a double compare and swap.
  # Then (try) committing the descriptor first
  read_root(abortable bool) Indirection_Node CTK CTV
  =>
    match root.read
      n Indirection_Node => n
      Rdcss_Descriptor   => rdcss_complete abortable


  # read root none abortably
  read_root => read_root false


  # do a double compare and swap of root node
  # 1. try compare and swap root
  # 2. if successful complete committing the descriptor
  rdcss_root(desc Rdcss_Descriptor CTK CTV) =>
    if cas_root desc.ov desc
      _ := rdcss_complete false
      desc.committed.read
    else
      false

  # completes the generation sensitive compare and set
  gcas_commit(i Indirection_Node CTK CTV, m Main_Node CTK CTV) Main_Node CTK CTV =>
    prev := m.prev.read
    # abortably read root and get the current gen
    root_gen := (read_root true).gen
    match prev
      nil => m
      fn failed_node =>
        if i.cas m fn.prev
          fn.prev
        else
          gcas_commit i i.data.read
      n Main_Node =>
        if root_gen = i.gen && !read_only
          if m.cas_prev prev nil
            m
          else
            gcas_commit i m
        else
          _ := m.cas_prev prev (failed_node n)
          gcas_commit i i.data.read

  # read `data`, if prev is set commit first
  gcas_read(i Indirection_Node CTK CTV) Main_Node CTK CTV =>
    m := i.data.read
    match m.prev.read
      nil => m
      * => gcas_commit i m

  # generation aware compare and set
  # semantics on the indirection node i
  # o is compared and swapped with n
  # but this compare and swap only succeeds if the root
  # generation does not change while this compare and
  # set is taking place.
  gcas(i Indirection_Node CTK CTV, o Main_Node CTK CTV, n choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV)) choice restart ctrie_ok
  pre
    match o.prev.read
      nil => true
      *   => false
  =>
    nn := Main_Node n o
    if i.cas o nn
      _ := gcas_commit i nn
      match nn.prev.read
        nil => ctrie_ok
        * => restart
    else
      restart


  # the width of the branching factor, 2^5 = 32
  width := u32 5


  # convert u64 hash to u32 hash
  hash0(h u64) u32 =>
    (h >> 32).low32bits ^ h.low32bits


  # returns flag and the position in the container_node for given params
  flagpos(hash u32, lev u32, bmp u32) tuple u32 u32 =>
    idx := (hash >> lev) & 0x1F
    flag := u32 1 << idx
    mask := flag -° 1
    pos := (bmp & mask).ones_count.as_u32
    (flag, pos)


  # compress a container node
  compress(cn container_node CTK CTV, lev u32, g u64) =>
    narr => cn.array.map_to_array (branch CTK CTV) n->
      match n
        i Indirection_Node =>
          match (gcas_read i).data
            // resurrect
            tn tomb_node => tn.sn
            * => i
        sn singleton_node => sn
    contract (container_node cn.bmp narr g) lev


  # contract a container node
  contract(cn container_node CTK CTV, lev u32) choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV) =>
    if (lev > 0) && (cn.array.length = 1)
      match cn[0]
        sn singleton_node => tomb_node sn
        Indirection_Node  => cn
    else
      cn


  # clean an indirection node:
  # compress contained container node
  clean(nd option (Indirection_Node CTK CTV), lev u32) =>
    _ := nd.bind unit i->
      m := gcas_read i
      _ := match m.data
        c container_node =>
          _ := gcas i m (compress c lev i.gen)
        * =>
    restart


  # turns this: container_node -> Indirection_Node -> tomb_node -> singleton_node
  # into  this: container_node -> singleton_node
  clean_parent(parent option (Indirection_Node CTK CTV), i Indirection_Node CTK CTV, hash, lev u32, gen u64) unit =>
    _ := parent >>= p->
      m := gcas_read p
      match m.data
        cn container_node =>
          (flag, pos) := flagpos hash lev cn.bmp
          if (cn.bmp & flag) != 0
            match cn[pos.as_i32]
              inode Indirection_Node =>
                if inode = i
                  match (gcas_read i).data
                    tn tomb_node =>
                      ncn := cn.update pos tn.sn i.gen
                      match gcas p m (contract ncn lev-width)
                        restart =>
                          if read_root.gen = gen
                            clean_parent p i hash lev gen
                        ctrie_ok =>
                    * =>
              * =>
        * =>
      nil


  # takes two single nodes and returns either
  # Main_Node -> container_node -> singleton_nodes
  # or
  # Main_Node -> list_node -> singleton_nodes
  # or recurse
  # Main_Node -> container_node -> Indirection_Node -> dual x y
  dual(x, y singleton_node CTK CTV, lev u32, gen u64) =>
    d choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV) =>
      # NYI why 35??
      if lev < 35
        xidx := ((hash0 (hash x.k)) >> lev) & 0x1f
        yidx := ((hash0 (hash y.k)) >> lev) & 0x1f
        bmp := (u32 1 << xidx) | (u32 1 << yidx)
        if xidx = yidx
          # NYI: BUG: type inference
          sub_node := indirection_node CTK CTV (dual x y (lev + width) gen) gen
          # NYI: BUG: type inference
          container_node CTK CTV bmp [sub_node] gen
        else
          if (xidx < yidx)
            # NYI: BUG: type inference
            container_node CTK CTV bmp [x, y] gen
          else
            # NYI: BUG: type inference
            container_node CTK CTV bmp [y, x] gen
      else
        list_node [(singleton_node x.k x.v), (singleton_node y.k y.v)]

    Main_Node d nil


  # lookup key k
  public redef index [] (k CTK) option CTV =>
    r := read_root
    match lookup r k 0 nil r.gen
      restart =>
        Map.this[k]
      not_found =>
        nil
      v CTV =>
        v


  # try lookup key in ctrie
  # may fail and result in a restart
  lookup(i Indirection_Node CTK CTV, k CTK, lev u32, parent option (Indirection_Node CTK CTV), gen u64) choice restart not_found CTV
  =>
    m := gcas_read i
    match m.data
      cn container_node =>
        (flag, pos) := flagpos (hash0 (hash k)) lev cn.bmp
        if (cn.bmp & flag) = 0
          not_found
        else
          match cn[pos.as_i32]
            sin Indirection_Node =>
              if read_only || gen = sin.gen
                lookup sin k (lev + width) i gen
              else
                match gcas i m (renew cn gen)
                  ctrie_ok => lookup i k lev parent gen
                  restart => restart
            sn singleton_node => if sn.k = k then sn.v else not_found
      tn tomb_node =>
        if read_only
          (if k = tn.sn.k then tn.sn.v else not_found)
        else
          clean parent (lev - width)
      ln list_node => ln.find_key k


  # add key value
  # if key is already present value is updated
  public redef put(k CTK, v CTV) unit =>
    r := read_root
    match put r k v 0 nil r.gen
      restart =>
        put k v
      ctrie_ok =>


  # try adding an element to the ctrie
  # may fail and result in a restart
  put(i Indirection_Node CTK CTV, k CTK, v CTV, lev u32, parent option (Indirection_Node CTK CTV), gen u64) choice restart ctrie_ok
  =>
    m := gcas_read i
    match m.data
      cn container_node =>
        (flag, pos) := flagpos (hash0 (hash k)) lev cn.bmp
        if (cn.bmp & flag) = 0
          ncn := (if cn.gen = i.gen then cn else renew cn i.gen).put (singleton_node k v) pos flag i.gen
          gcas i m ncn
        else
          match cn[pos.as_i32]
            ci Indirection_Node =>
              if ci.gen = gen
                put ci k v lev+width i gen
              else
                match gcas i m (renew cn gen)
                  ctrie_ok => put i k v lev parent gen
                  restart => restart
            sn singleton_node =>
              if sn.k != k
                nin := indirection_node (dual sn (singleton_node k v) (lev + width) i.gen) i.gen
                ncn := (if cn.gen = i.gen then cn else renew cn i.gen).update pos nin i.gen
                gcas i m ncn
              else
                gcas i m (cn.update pos (singleton_node k v) i.gen)
      tomb_node =>
        clean parent (lev - width)
      ln list_node =>
        gcas i m (list_node ([singleton_node k v] ++ (ln.from.filter (sn -> sn.k != k))))


  # remove key from ctrie
  public redef remove(k CTK) option CTV =>
    r := read_root
    match remove r k 0 nil r.gen
      restart   => remove k
      not_found => nil
      v CTV     => v


  # try remove an element from the ctrie
  # may fail and result in a restart
  remove(i Indirection_Node CTK CTV, k CTK, lev u32, parent option (Indirection_Node CTK CTV), gen u64) choice restart not_found CTV
  =>
    m := gcas_read i
    match m.data
      cn container_node =>
        (flag, pos) := flagpos (hash0 (hash k)) lev cn.bmp
        if (cn.bmp & flag) = 0
          not_found
        else
          res choice restart not_found CTV := match cn[pos.as_i32]
            sin Indirection_Node =>
              if sin.gen = gen
                remove sin k (lev + width) i gen
              else
                match gcas i m (renew cn gen)
                  ctrie_ok => remove i k lev parent gen
                  restart => restart
            sn singleton_node =>
              if sn.k != k
                not_found
              else
                match gcas i m (contract (cn.remove pos flag i.gen) lev)
                  ctrie_ok => sn.v
                  restart => restart
          match res
            CTV =>
              match (gcas_read i).data
                tomb_node => clean_parent parent i (hash0 (hash k)) lev gen
                * =>
            * =>
          res
      tomb_node =>
        clean parent (lev - width)
      ln list_node =>
        fln := list_node ln.from.filter(sn -> sn.k != k)
        match gcas i m (if fln.from.count = 1 then tomb_node fln.from.first.get else fln)
          ctrie_ok => ln.find_key k
          restart => restart


  # the size of the ctrie
  public redef size =>
    items.count



  # take a snapshot of the ctrie
  public snapshot(read_only bool) =>
    r := read_root
    expmain := gcas_read r
    new_gen => unique_id
    descriptor := Rdcss_Descriptor r expmain (copy_to_gen r new_gen)
    if rdcss_root descriptor
      # new ctrie by changing generation of r
      Map (concur.atomic (root_node CTK CTV) (copy_to_gen r new_gen)) snapshot.this.read_only
    else
      snapshot snapshot.this.read_only


  # a snapshot of the ctrie as sequence auf key-value tuples
  public redef items Sequence (tuple CTK CTV) =>
    snapshot true
      .read_root
      .as_list (i -> gcas_read i)


  # this ctrie as a persistent map
  #
  # a snapshot is taken every time
  # the map is modified.
  #
  # NYI: BUG: #3646 make public when snapshots work
  as_persistent_map container.Persistent_Map CTK CTV =>
    s := snapshot false
    ref : container.Persistent_Map CTK CTV
      public redef size => s.size
      public redef index [] (k CTK) => s[k]
      public redef items => s.items
      public redef put(k CTK, v CTV) =>
        sp := s.snapshot false
        sp.put k v
        sp.as_persistent_map


  # initialize a new ctrie
  public fixed redef type.empty =>

    initial_gen => unique_id

    initial_root_node lock_free.root_node CTK CTV :=
      lock_free.indirection_node (lock_free.Main_Node (lock_free.container_node CTK CTV 0 [] initial_gen) nil) initial_gen

    lock_free.Map (concur.atomic initial_root_node) false


  # lock_free.Map.from_entries -- routine to initialize a ctrie from a sequence of key value tuples
  #
  # This feature creates an instance of a ctrie.
  #
  # example: lock_free.Map.from_entries [(key1, value1), (key2, value2)]
  #
  public type.from_entries(kvs Sequence (tuple CTK CTV)) lock_free.Map CTK CTV =>
    m := (lock_free.Map CTK CTV).empty
    kvs.for_each (kv -> m.put kv.0 kv.1)
    m
last changed: 2025-01-23