lock_free/Map.fz
# This file is part of the Fuzion language implementation.
#
# The Fuzion language implementation is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, version 3 of the License.
#
# The Fuzion language implementation is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License along with The
# Fuzion language implementation. If not, see <https://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------
#
# Tokiwa Software GmbH, Germany
#
# Source code of Fuzion feature lock_free.Map
#
# Author: Michael Lill (michael.lill@tokiwa.software)
#
# -----------------------------------------------------------------------
# A Fuzion implementation of CTrie invented by Aleksandar Prokopec
# CTrie is a non-blocking concurrent hash trie
#
# reference paper: Concurrent Tries with Efficient Non-Blocking Snapshots
# https://aleksandar-prokopec.com/resources/docs/ctries-snapshot.pdf
#
# reference implementation in Scala: https://github.com/axel22/Ctries/
# on wikipedia: https://en.wikipedia.org/wiki/Ctrie
#
# Complexity (according to the paper):
# add, lookup, remove : O(log n)
# snapshot, amortized size retrieval, clear : O(1)
#
# While CTrie is lock-free and guaranteed to make progress as a whole
# it is not wait-free and thus no such guarantee is given per thread.
#
# NYI addif
#
# glossary:
# CTK => ctrie key
# CTV => ctrie value
# k => key
# v => value
# gen => generation
# lev => level
# bmp => bitmap
# idx => index
# W => 2^W-way branching factor
#
# a tomb node
# "a T-node is the last value assigned to an I-node"
tomb_node(CTK type : property.hashable, CTV type, sn singleton_node CTK CTV) is
redef as_string => "tomb_node($sn)"
as_list => sn.as_list
# a singleton node
# the node type containing actual data
singleton_node(CTK type : property.hashable, CTV type, k CTK, v CTV) is
redef as_string => "singleton_node($k, $v)"
as_list => [(k,v)].as_list
# an indirection or a singleton node
# these are put into a container node
branch(CTK type : property.hashable, CTV type) : choice (Indirection_Node CTK CTV) (singleton_node CTK CTV) is
redef as_string =>
match branch.this
indirection_node Indirection_Node => "$indirection_node"
singleton_node singleton_node => "$singleton_node"
as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV) =>
match branch.this
indirection_node Indirection_Node => indirection_node.as_list f
singleton_node singleton_node => singleton_node.as_list
# a container node
# consists of a bitmap of filled spaces and an array of child nodes
container_node(CTK type : property.hashable, CTV type, bmp u32, array array (branch CTK CTV), gen u64)
is
# update a child node and return a new container_node
update(pos u32, node branch CTK CTV, g u64) =>
container_node bmp (array.put pos.as_i32 node) g
# add a child node and return a new container_node
put(sn singleton_node CTK CTV, pos, flag u32, g u64) =>
container_node (bmp | flag) (array.insert pos.as_i32 sn).as_array g
# remove a child node and return a new container_node
remove(pos, flag u32, g u64)
pre debug: pos < array.length.as_u32
post array.length > result.array.length
=>
# NYI delete/remove should be in stdlib
tmp := array.take pos.as_i32 ++ array.drop (pos + 1).as_i32
container_node (bmp ^ flag) tmp.as_array g
# get item at pos in this container node
index [] (pos i32) branch CTK CTV =>
array[pos]
redef as_string => "container_node[gen=$gen]({array.as_string ", "})"
as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV) =>
array.flat_map (tuple CTK CTV) (b -> b.as_list f)
.as_list
# a prev_node is used in the generation aware compare and swap
#
# Main_Node.prev is initially nil
# when a Main_Node is replaced it is put into
# the successors `prev`-field.
#
prev_node(CTK type : property.hashable, CTV type) : choice (failed_node CTK CTV) (Main_Node CTK CTV) nil is
redef as_string =>
match prev_node.this
f lock_free.failed_node => f.as_string
m lock_free.Main_Node => m.as_string
nil => nil.as_string
is_nil =>
match prev_node.this
nil => true
* => false
# a container, tomb or linked list node, with a previous field
# to support the generational compare and swap
#
Main_Node(CTK type : property.hashable, CTV type, data choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV), p prev_node CTK CTV) ref : property.equatable is
id := unique_id
# a previous node that gets set during a generational aware compare and set
prev := concur.atomic (prev_node CTK CTV) p
fixed redef type.equality(a, b Main_Node.this) bool =>
a.id = b.id
# compare and update `prev`
cas_prev(o,n prev_node CTK CTV) =>
prev.compare_and_set o n
redef as_string =>
s := match data
container_node container_node => "$container_node"
tomb_node tomb_node => "$tomb_node"
list_node list_node => "$list_node"
"Main_Node[prev={prev.read}]($s)"
as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV)
pre match prev.read
nil => true
* => false
=>
match data
c container_node => c.as_list f
t tomb_node => t.as_list
l list_node => l.as_list
# a failed node where the previous indirection node contains a main node
failed_node(CTK type : property.hashable, CTV type, prev Main_Node CTK CTV) is
redef as_string =>
"failed_node($prev)"
# shorthand for creating a new indirection node from Main_Node and gen
indirection_node(CTK type : property.hashable, CTV type, data Main_Node CTK CTV, gen u64) =>
Indirection_Node (concur.atomic data) gen
# an indirection node
Indirection_Node(CTK type : property.hashable, CTV type, data concur.atomic (Main_Node CTK CTV), gen u64) ref : property.equatable is
id := unique_id
fixed redef type.equality(a, b Indirection_Node.this) =>
a.id = b.id
# compare and update
cas(old_n, new_n Main_Node CTK CTV) =>
data.compare_and_set old_n new_n
redef as_string => "Indirection_Node[gen=$gen]({data.read})"
as_list(f Indirection_Node CTK CTV -> Main_Node CTK CTV) =>
(f Indirection_Node.this).as_list f
# a linked list node
# NYI instead of Sequence we should use something like the original implementation ListMap(Scala).
list_node(CTK type : property.hashable, CTV type, from Sequence (singleton_node CTK CTV)) : Sequence (tuple CTK CTV)
pre from ∀ (sn -> (from.filter (snn -> (hash sn.k) = (hash snn.k))).count = 1)
is
redef as_list => from
.map sn->(sn.k, sn.v)
.as_list
# is this sequence known to be finite? For infinite sequences, features like
# count diverge.
#
redef finite => trit.yes
redef as_string => "list_node({from.as_string ", "})"
# find k in linked nodes
find_key(k CTK) choice restart not_found CTV =>
match from.drop_while(sn -> sn.k != k).first
nil => not_found
sn singleton_node => sn.v
# unit type to indicate an operation did not succeed yet
# and thus needs a restart
restart is
# unit type to indicate success
ctrie_ok is
# unit type to indicate when value to lookup/remove is not found
not_found is
public redef as_string => "not found"
# descriptor for double-compare-single-swap operation
Rdcss_Descriptor(CTK type : property.hashable, CTV type, ov Indirection_Node CTK CTV, exp Main_Node CTK CTV, nv Indirection_Node CTK CTV) ref
pre exp.prev.read.is_nil
is
# this field does not have to be atomic
# but it probably does not hurt either.
committed := concur.atomic false
redef as_string =>
"Rdcss_Descriptor($ov, $exp, $nv)"
# the root node of the ctrie, normally an indirection node.
# in case the root node is currently replaced it is a Rdcss_Descriptor temporarily.
root_node(CTK type : property.hashable, CTV type) : choice (Indirection_Node CTK CTV) (Rdcss_Descriptor CTK CTV) is
redef as_string =>
s => match root_node.this
i lock_free.Indirection_Node => i.as_string
r lock_free.Rdcss_Descriptor => r.as_string
"root_node($s)"
# the ctrie
private:public Map(public CTK type : property.hashable,
public CTV type,
root concur.atomic (root_node CTK CTV),
public read_only bool) ref : container.Mutable_Map CTK CTV
is
# the data structure as human readable string
# for debugging purposes
as_string_internal =>
"Ctrie[ro=$read_only]({root.read})"
# compare and swap root of the ctrie
cas_root(ov, nv root_node CTK CTV) =>
root.compare_and_set ov nv
# copy an indirection node to a new generation
# this is used when
# 1) taking a snapshot and the root node needs to be copied to a new generation
# 2) copying a container node to a new generation
#
copy_to_gen(i Indirection_Node CTK CTV, new_gen u64)
post result.data.read.prev.read.is_nil
=>
indirection_node (gcas_read i) new_gen
# copy this container_node to new generation
renew(cn container_node CTK CTV, new_gen u64) =>
copy => cn
.array
.map (branch CTK CTV) x->
match x
i Indirection_Node =>
copy_to_gen i new_gen
sn singleton_node => sn
.as_array
container_node cn.bmp copy new_gen
# complete the double compare and swap
# of the root node
rdcss_complete(abortable bool) Indirection_Node CTK CTV
=>
match root.read
# there is nothing to do
n Indirection_Node => n
desc Rdcss_Descriptor =>
if abortable
if cas_root desc desc.ov
desc.ov
else
rdcss_complete abortable
else
old_main := gcas_read desc.ov
if old_main = desc.exp
if cas_root desc desc.nv
desc.committed.write true
desc.nv
else
rdcss_complete abortable
else
if cas_root desc desc.ov
desc.ov
else
rdcss_complete abortable
# read root
# if root is currently a descriptor we are in the middle
# of a double compare and swap.
# Then (try) committing the descriptor first
read_root(abortable bool) Indirection_Node CTK CTV
=>
match root.read
n Indirection_Node => n
Rdcss_Descriptor => rdcss_complete abortable
# read root none abortably
read_root => read_root false
# do a double compare and swap of root node
# 1. try compare and swap root
# 2. if successful complete committing the descriptor
rdcss_root(desc Rdcss_Descriptor CTK CTV) =>
if cas_root desc.ov desc
_ := rdcss_complete false
desc.committed.read
else
false
# completes the generation sensitive compare and set
gcas_commit(i Indirection_Node CTK CTV, m Main_Node CTK CTV) Main_Node CTK CTV =>
prev := m.prev.read
# abortably read root and get the current gen
root_gen := (read_root true).gen
match prev
nil => m
fn failed_node =>
if i.cas m fn.prev
fn.prev
else
gcas_commit i i.data.read
n Main_Node =>
if root_gen = i.gen && !read_only
if m.cas_prev prev nil
m
else
gcas_commit i m
else
_ := m.cas_prev prev (failed_node n)
gcas_commit i i.data.read
# read `data`, if prev is set commit first
gcas_read(i Indirection_Node CTK CTV) Main_Node CTK CTV =>
m := i.data.read
match m.prev.read
nil => m
* => gcas_commit i m
# generation aware compare and set
# semantics on the indirection node i
# o is compared and swapped with n
# but this compare and swap only succeeds if the root
# generation does not change while this compare and
# set is taking place.
gcas(i Indirection_Node CTK CTV, o Main_Node CTK CTV, n choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV)) choice restart ctrie_ok
pre
match o.prev.read
nil => true
* => false
=>
nn := Main_Node n o
if i.cas o nn
_ := gcas_commit i nn
match nn.prev.read
nil => ctrie_ok
* => restart
else
restart
# the width of the branching factor, 2^5 = 32
width := u32 5
# convert u64 hash to u32 hash
hash0(h u64) u32 =>
(h >> 32).low32bits ^ h.low32bits
# returns flag and the position in the container_node for given params
flagpos(hash u32, lev u32, bmp u32) tuple u32 u32 =>
idx := (hash >> lev) & 0x1F
flag := u32 1 << idx
mask := flag -° 1
pos := (bmp & mask).ones_count.as_u32
(flag, pos)
# compress a container node
compress(cn container_node CTK CTV, lev u32, g u64) =>
narr => cn.array.map_to_array (branch CTK CTV) n->
match n
i Indirection_Node =>
match (gcas_read i).data
// resurrect
tn tomb_node => tn.sn
* => i
sn singleton_node => sn
contract (container_node cn.bmp narr g) lev
# contract a container node
contract(cn container_node CTK CTV, lev u32) choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV) =>
if (lev > 0) && (cn.array.length = 1)
match cn[0]
sn singleton_node => tomb_node sn
Indirection_Node => cn
else
cn
# clean an indirection node:
# compress contained container node
clean(nd option (Indirection_Node CTK CTV), lev u32) =>
_ := nd.bind unit i->
m := gcas_read i
_ := match m.data
c container_node =>
_ := gcas i m (compress c lev i.gen)
* =>
restart
# turns this: container_node -> Indirection_Node -> tomb_node -> singleton_node
# into this: container_node -> singleton_node
clean_parent(parent option (Indirection_Node CTK CTV), i Indirection_Node CTK CTV, hash, lev u32, gen u64) unit =>
_ := parent >>= p->
m := gcas_read p
match m.data
cn container_node =>
(flag, pos) := flagpos hash lev cn.bmp
if (cn.bmp & flag) != 0
match cn[pos.as_i32]
inode Indirection_Node =>
if inode = i
match (gcas_read i).data
tn tomb_node =>
ncn := cn.update pos tn.sn i.gen
match gcas p m (contract ncn lev-width)
restart =>
if read_root.gen = gen
clean_parent p i hash lev gen
ctrie_ok =>
* =>
* =>
* =>
nil
# takes two single nodes and returns either
# Main_Node -> container_node -> singleton_nodes
# or
# Main_Node -> list_node -> singleton_nodes
# or recurse
# Main_Node -> container_node -> Indirection_Node -> dual x y
dual(x, y singleton_node CTK CTV, lev u32, gen u64) =>
d choice (container_node CTK CTV) (tomb_node CTK CTV) (list_node CTK CTV) =>
# NYI why 35??
if lev < 35
xidx := ((hash0 (hash x.k)) >> lev) & 0x1f
yidx := ((hash0 (hash y.k)) >> lev) & 0x1f
bmp := (u32 1 << xidx) | (u32 1 << yidx)
if xidx = yidx
# NYI: BUG: type inference
sub_node := indirection_node CTK CTV (dual x y (lev + width) gen) gen
# NYI: BUG: type inference
container_node CTK CTV bmp [sub_node] gen
else
if (xidx < yidx)
# NYI: BUG: type inference
container_node CTK CTV bmp [x, y] gen
else
# NYI: BUG: type inference
container_node CTK CTV bmp [y, x] gen
else
list_node [(singleton_node x.k x.v), (singleton_node y.k y.v)]
Main_Node d nil
# lookup key k
public redef index [] (k CTK) option CTV =>
r := read_root
match lookup r k 0 nil r.gen
restart =>
Map.this[k]
not_found =>
nil
v CTV =>
v
# try lookup key in ctrie
# may fail and result in a restart
lookup(i Indirection_Node CTK CTV, k CTK, lev u32, parent option (Indirection_Node CTK CTV), gen u64) choice restart not_found CTV
=>
m := gcas_read i
match m.data
cn container_node =>
(flag, pos) := flagpos (hash0 (hash k)) lev cn.bmp
if (cn.bmp & flag) = 0
not_found
else
match cn[pos.as_i32]
sin Indirection_Node =>
if read_only || gen = sin.gen
lookup sin k (lev + width) i gen
else
match gcas i m (renew cn gen)
ctrie_ok => lookup i k lev parent gen
restart => restart
sn singleton_node => if sn.k = k then sn.v else not_found
tn tomb_node =>
if read_only
(if k = tn.sn.k then tn.sn.v else not_found)
else
clean parent (lev - width)
ln list_node => ln.find_key k
# add key value
# if key is already present value is updated
public redef put(k CTK, v CTV) unit =>
r := read_root
match put r k v 0 nil r.gen
restart =>
put k v
ctrie_ok =>
# try adding an element to the ctrie
# may fail and result in a restart
put(i Indirection_Node CTK CTV, k CTK, v CTV, lev u32, parent option (Indirection_Node CTK CTV), gen u64) choice restart ctrie_ok
=>
m := gcas_read i
match m.data
cn container_node =>
(flag, pos) := flagpos (hash0 (hash k)) lev cn.bmp
if (cn.bmp & flag) = 0
ncn := (if cn.gen = i.gen then cn else renew cn i.gen).put (singleton_node k v) pos flag i.gen
gcas i m ncn
else
match cn[pos.as_i32]
ci Indirection_Node =>
if ci.gen = gen
put ci k v lev+width i gen
else
match gcas i m (renew cn gen)
ctrie_ok => put i k v lev parent gen
restart => restart
sn singleton_node =>
if sn.k != k
nin := indirection_node (dual sn (singleton_node k v) (lev + width) i.gen) i.gen
ncn := (if cn.gen = i.gen then cn else renew cn i.gen).update pos nin i.gen
gcas i m ncn
else
gcas i m (cn.update pos (singleton_node k v) i.gen)
tomb_node =>
clean parent (lev - width)
ln list_node =>
gcas i m (list_node ([singleton_node k v] ++ (ln.from.filter (sn -> sn.k != k))))
# remove key from ctrie
public redef remove(k CTK) option CTV =>
r := read_root
match remove r k 0 nil r.gen
restart => remove k
not_found => nil
v CTV => v
# try remove an element from the ctrie
# may fail and result in a restart
remove(i Indirection_Node CTK CTV, k CTK, lev u32, parent option (Indirection_Node CTK CTV), gen u64) choice restart not_found CTV
=>
m := gcas_read i
match m.data
cn container_node =>
(flag, pos) := flagpos (hash0 (hash k)) lev cn.bmp
if (cn.bmp & flag) = 0
not_found
else
res choice restart not_found CTV := match cn[pos.as_i32]
sin Indirection_Node =>
if sin.gen = gen
remove sin k (lev + width) i gen
else
match gcas i m (renew cn gen)
ctrie_ok => remove i k lev parent gen
restart => restart
sn singleton_node =>
if sn.k != k
not_found
else
match gcas i m (contract (cn.remove pos flag i.gen) lev)
ctrie_ok => sn.v
restart => restart
match res
CTV =>
match (gcas_read i).data
tomb_node => clean_parent parent i (hash0 (hash k)) lev gen
* =>
* =>
res
tomb_node =>
clean parent (lev - width)
ln list_node =>
fln := list_node ln.from.filter(sn -> sn.k != k)
match gcas i m (if fln.from.count = 1 then tomb_node fln.from.first.get else fln)
ctrie_ok => ln.find_key k
restart => restart
# the size of the ctrie
public redef size =>
items.count
# take a snapshot of the ctrie
public snapshot(read_only bool) =>
r := read_root
expmain := gcas_read r
new_gen => unique_id
descriptor := Rdcss_Descriptor r expmain (copy_to_gen r new_gen)
if rdcss_root descriptor
# new ctrie by changing generation of r
Map (concur.atomic (root_node CTK CTV) (copy_to_gen r new_gen)) snapshot.this.read_only
else
snapshot snapshot.this.read_only
# a snapshot of the ctrie as sequence auf key-value tuples
public redef items Sequence (tuple CTK CTV) =>
snapshot true
.read_root
.as_list (i -> gcas_read i)
# this ctrie as a persistent map
#
# a snapshot is taken every time
# the map is modified.
#
# NYI: BUG: #3646 make public when snapshots work
as_persistent_map container.Persistent_Map CTK CTV =>
s := snapshot false
ref : container.Persistent_Map CTK CTV
public redef size => s.size
public redef index [] (k CTK) => s[k]
public redef items => s.items
public redef put(k CTK, v CTV) =>
sp := s.snapshot false
sp.put k v
sp.as_persistent_map
# initialize a new ctrie
public fixed redef type.empty =>
initial_gen => unique_id
initial_root_node lock_free.root_node CTK CTV :=
lock_free.indirection_node (lock_free.Main_Node (lock_free.container_node CTK CTV 0 [] initial_gen) nil) initial_gen
lock_free.Map (concur.atomic initial_root_node) false
# lock_free.Map.from_entries -- routine to initialize a ctrie from a sequence of key value tuples
#
# This feature creates an instance of a ctrie.
#
# example: lock_free.Map.from_entries [(key1, value1), (key2, value2)]
#
public type.from_entries(kvs Sequence (tuple CTK CTV)) lock_free.Map CTK CTV =>
m := (lock_free.Map CTK CTV).empty
kvs.for_each (kv -> m.put kv.0 kv.1)
m