Skip to content

Commit

Permalink
Add gen-do-lines file method that works with clozure.
Browse files Browse the repository at this point in the history
  • Loading branch information
m00natic committed Sep 9, 2018
1 parent 92f3b1c commit 49355df
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 23 deletions.
7 changes: 4 additions & 3 deletions README.org
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ SQL-like operations over DBs with fields/columns with fixed number of
ASCII bytes. It's extensible but so far supports only flat file DBs.

This is not a DB engine, each query is treated on its own without
caches, indexes etc. That said, it tries to generate near optimal
specialized code which is then fed to the compiler before getting run.
See [[https://m00natic.github.io/lisp/manual-jit.html][Uniform Structured Syntax, Metaprogramming and Run-time Compilation]].
persistent caches, indexes etc. That said, it tries to generate near
optimal specialized code which is then fed to the compiler before
getting run. See [[https://m00natic.github.io/lisp/manual-jit.html][Uniform Structured Syntax, Metaprogramming and
Run-time Compilation]].

** Declare DBs

Expand Down
2 changes: 1 addition & 1 deletion fdbq.asd
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
:description "SQL-like querying over fixed-field DBs."
:author "Andrey Kotlarski <[email protected]>"
:license "BSD-3"
:version "0.3"
:version "0.4"
:depends-on (:cl-ppcre #:ascii-strings #:lparallel #:alexandria)
:serial t
:components ((:file "package")
Expand Down
140 changes: 122 additions & 18 deletions storage-file.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ OFFSET-VAR is symbol representing the current offset in the db buffer."
:reduce-fn '+ :jobs jobs
:result-var 'result :result-initform 0 :result-type 'fixnum)))

#-ccl
(defmethod gen-do-lines ((spec spec-file) line-var body
&key (result-var (gensym)) result-type result-initform
(jobs 1) reduce-fn
Expand Down Expand Up @@ -154,19 +155,16 @@ to raw byte buffer and current line offset within it respectively."
`(ignore ,portion-id)))
,(if threading?
`(bt:with-lock-held (,file-lock)
(setf ,take-count (floor (read-sequence ,buffer-var ,ins) ,line-size)
(setf ,take-count (read-sequence ,buffer-var ,ins)
,portion-id (incf ,job-count)))
`(setf ,take-count (floor (read-sequence ,buffer-var ,ins)
,line-size)))
(loop for ,offset-var fixnum from 0 by ,line-size
repeat ,take-count
`(setf ,take-count (read-sequence ,buffer-var ,ins)))
(loop for ,offset-var fixnum from 0 below ,take-count by ,line-size
do ,@body)
,(if threading?
`(cons (cons ,portion-id ,result-var)
(when (= ,take-count ,(/ buffer-size line-size))
(when (= ,take-count ,buffer-size)
,job-id))
`(values ,result-var (= ,take-count
,(/ buffer-size line-size)))))))
`(values ,result-var (= ,take-count ,buffer-size))))))
,(cond
(threading?
`(let* ((result ,result-initform)
Expand All @@ -189,16 +187,16 @@ to raw byte buffer and current line offset within it respectively."
(if job-id
(lparallel:submit-task chan #'mapper job-id)
(setf more? nil))
(push res ready-res)
#1=(loop for res = (assoc next ready-res)
while res
do (locally
(declare (type (cons fixnum ,result-type)
res))
(setf result (,reduce-fn result (cdr res))
ready-res (delete next ready-res
:key #'car)))
(incf next))))
(push res ready-res))
#1=(loop for res = (assoc next ready-res)
while res
do (locally
(declare (type (cons fixnum ,result-type)
res))
(setf result (,reduce-fn result (cdr res))
ready-res (delete next ready-res
:key #'car)))
(incf next)))
(lparallel:do-fast-receives (res chan (1- ,jobs))
(push (car res) ready-res)
#1#)))
Expand All @@ -219,3 +217,109 @@ to raw byte buffer and current line offset within it respectively."
do (multiple-value-bind (res more) (mapper 0)
(declare (ignore res))
(setf more? more)))))))))))

#+ccl
(defmethod gen-do-lines ((spec spec-file) line-var body
&key (result-var (gensym)) result-type result-initform
(jobs 1) reduce-fn
buffer-var offset-var)
"File db iteration. BUFFER-VAR and OFFSET-VAR get bound
to raw byte buffer and current line offset within it respectively."
(let* ((line-size (1+ (spec-size spec))) ;add 1 for newline
(buffer-size (if (< line-size *buffer-size*)
(* line-size (floor *buffer-size* line-size))
line-size))
(threading? (< 1 jobs)))
(alexandria:with-gensyms (buffer-array line-array take-end job-id portion-id)
`(let ((,buffer-array (make-array ,jobs :element-type '(simple-array ascii:ub-char
(,buffer-size))
:initial-contents
(list ,@(loop repeat jobs
collect `(make-array ,buffer-size
:element-type 'ascii:ub-char)))))
(,line-array (make-array ,jobs :element-type '(simple-base-string ,(1- line-size))
:initial-contents
(list ,@(loop repeat jobs
collect `(make-array ,(1- line-size)
:element-type 'base-char))))))
(declare ,@(if threading?
`((dynamic-extent ,line-array))
`((dynamic-extent ,buffer-array ,line-array))))
(flet ((mapper (,take-end ,job-id ,portion-id)
(declare (optimize (speed 3) (debug 0) (safety 0) (compilation-speed 0))
(type fixnum ,take-end ,job-id)
,(if threading?
`(type fixnum ,portion-id)
`(ignore ,portion-id)))
(let ((,line-var (aref ,line-array ,job-id))
(,result-var ,result-initform)
(,buffer-var (aref ,buffer-array ,job-id)))
(declare ,(if result-type
`(type ,result-type ,result-var)
`(ignorable ,result-var))
(type (simple-base-string ,(1- line-size)) ,line-var)
(ignorable ,line-var)
(type (simple-array ascii:ub-char (,buffer-size)) ,buffer-var))
(loop for ,offset-var fixnum from 0 below ,take-end by ,line-size
do ,@body)
,(if threading?
`(cons (cons ,portion-id ,result-var)
,job-id)
result-var))))
(with-open-file (ins ,(spec-path spec) :direction :input
:element-type 'ascii:ub-char)
,(cond
(threading?
`(let* ((result ,result-initform)
(lparallel:*kernel* (lparallel:make-kernel ,jobs))
(chan (lparallel:make-channel)))
,(when result-type
`(declare (type ,result-type result)))
(unwind-protect
(let ((portion-count 0)
(next 1)
(ready-res nil))
(declare (type fixnum portion-count next)
(type list ready-res))
(loop for i fixnum from 0 below ,jobs
for bytes fixnum = (read-sequence (aref ,buffer-array i) ins)
until (zerop bytes)
do (lparallel:submit-task chan #'mapper bytes i
(incf portion-count)))
(when (= ,jobs portion-count)
(loop with more? = t
while more?
do (destructuring-bind (res . job-id)
(lparallel:receive-result chan)
(let ((bytes (read-sequence (aref ,buffer-array job-id) ins)))
(if (zerop bytes)
(setf more? nil)
(lparallel:submit-task chan #'mapper bytes job-id
(incf portion-count))))
(push res ready-res))
#1=(loop for res = (assoc next ready-res)
while res
do (locally
(declare (type (cons fixnum ,result-type)
res))
(setf result (,reduce-fn result (cdr res))
ready-res (delete next ready-res
:key #'car)))
(incf next)))
(lparallel:do-fast-receives (res chan
(min portion-count (1- ,jobs)))
(push (car res) ready-res)
#1#)))
(lparallel:end-kernel))
result))
(reduce-fn
`(let ((result ,result-initform))
,(when result-type
`(declare (type ,result-type result)))
(loop for bytes fixnum = (read-sequence (aref ,buffer-array 0) ins)
until (zerop bytes)
do (setf result (,reduce-fn result (mapper bytes 0 0))))
result))
(t `(loop for bytes fixnum = (read-sequence (aref ,buffer-array 0) ins)
until (zerop bytes)
do (mapper bytes 0 0))))))))))
2 changes: 1 addition & 1 deletion utils.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ SPEC holds field offset details."
:reduce-fn '+ :jobs jobs
:result-var 'result :result-initform 0 :result-type 'fixnum)))

(proclaim '(inline append-vec))
(declaim (inline append-vec))

(defun append-vec (vec1 vec2)
"Append VEC2 to the end of VEC1."
Expand Down

0 comments on commit 49355df

Please sign in to comment.