Commit 019b80b9 authored by matt24smith's avatar matt24smith
Browse files

improved status messages when decoding

parent e45ca681
Pipeline #4977 passed with stages
in 2 minutes and 32 seconds
__version__ = "1.0.20"
__version__ = "1.0.21"
......@@ -55,10 +55,11 @@ pub async fn main() -> Result<(), Error> {
let start = Instant::now();
// array tuples containing (dbpath, filepath)
let mut n = 0;
//let mut n = 0;
let mut path_arr = vec![];
for file in args.files {
n += 1;
//n += 1;
/*
if n <= args.start {
continue;
} else if n > args.end {
......@@ -66,6 +67,8 @@ pub async fn main() -> Result<(), Error> {
} else {
path_arr.push((std::path::PathBuf::from(&args.dbpath), file));
}
*/
path_arr.push((std::path::PathBuf::from(&args.dbpath), file));
}
// create a future for the database call
......@@ -85,8 +88,6 @@ pub async fn main() -> Result<(), Error> {
// .collect::<Vec<_>>();
//let _results = futures::future::join_all(handles).await;
// same thing but iterating over files in rawdata_dir
// uses different futures aggregation method ??
if args.rawdata_dir.is_some() {
let mut fpaths: Vec<_> = std::fs::read_dir(&args.rawdata_dir.unwrap())
.unwrap()
......@@ -94,7 +95,8 @@ pub async fn main() -> Result<(), Error> {
.collect();
fpaths.sort_by_key(|t| t.1.path());
// same thing but iterating over files in rawdata_dir
// uses different futures aggregation method ??
iter(fpaths)
.for_each_concurrent(2, |(d, f)| async move {
decode_insert_msgs(&d, &f.path()).await.expect("decoding")
......@@ -104,11 +106,10 @@ pub async fn main() -> Result<(), Error> {
let elapsed = start.elapsed();
println!(
"total insert time: {} minutes\n",
"total insert time: {} minutes\nvacuuming...",
elapsed.as_secs_f32() / 60.,
);
//let sql = "VACUUM INTO '/run/media/matt/My Passport/test_vacuum_rust.db'";
let sql = "VACUUM";
let mut conn = get_db_conn(&args.dbpath).expect("get db conn");
let tx = conn.transaction().unwrap();
......
use std::time::Instant;
use chrono::MIN_DATETIME;
use rusqlite::{params, Connection, Result, Transaction};
......@@ -233,14 +231,11 @@ pub fn sqlite_insert_dynamic(tx: &Transaction, msgs: Vec<VesselData>, mstr: &str
VALUES (?,?,?,?,?,?,?,?,?,?)",
mstr
);
let start = Instant::now();
let mut stmt = tx
.prepare_cached(sql.as_str())
.expect("preparing statement");
let mut n = 0;
for msg in msgs {
let (p, e) = msg.dynamicdata();
let _ = stmt
......@@ -257,17 +252,8 @@ pub fn sqlite_insert_dynamic(tx: &Transaction, msgs: Vec<VesselData>, mstr: &str
p.timestamp_seconds,
])
.expect("executing prepared row");
n += 1;
}
let elapsed = start.elapsed();
println!(
"inserted: {} msgs/s elapsed: {}s count: {}",
n as f32 / elapsed.as_secs_f32(),
elapsed.as_secs_f32(),
n,
);
Ok(())
}
......@@ -276,7 +262,6 @@ pub fn sqlite_insert_dynamic(tx: &Transaction, msgs: Vec<VesselData>, mstr: &str
#[cfg(test)]
mod tests {
use std::path::Path;
use std::path::PathBuf;
use super::Result;
use crate::decodemsgs;
......@@ -287,15 +272,6 @@ mod tests {
use crate::sqlite_insert_dynamic;
use crate::sqlite_insert_static;
fn testing_dbpaths() -> [std::path::PathBuf; 2] {
[
Path::new(":memory:").to_path_buf(),
[std::env::current_dir().unwrap().to_str().unwrap(), "ais.db"]
.iter()
.collect::<PathBuf>(),
]
}
#[test]
fn test_create_statictable() -> Result<()> {
let mstr = "00test00";
......
......@@ -155,6 +155,7 @@ pub async fn decode_insert_msgs(
) -> Result<(), Error> {
let fstr = &filename.to_str().unwrap();
assert_eq!(&fstr[&fstr.len() - 4..], ".nm4");
let start = Instant::now();
let reader = BufReader::new(
File::open(filename)
......@@ -163,6 +164,7 @@ pub async fn decode_insert_msgs(
let mut parser = NmeaParser::new();
let mut stat_msgs = <Vec<VesselData>>::new();
let mut positions = <Vec<VesselData>>::new();
let mut count = 0;
let mut c = get_db_conn(dbpath).expect("getting db conn");
......@@ -181,8 +183,10 @@ pub async fn decode_insert_msgs(
if is_dynamic {
positions.push(message);
count += 1;
} else {
stat_msgs.push(message);
count += 1;
}
if positions.len() >= 500000 {
......@@ -197,24 +201,26 @@ pub async fn decode_insert_msgs(
};
}
println!(
"{} dynamic: {} static: {}",
filename.to_str().unwrap().rsplit_once('/').unwrap().1,
positions.len(),
stat_msgs.len(),
);
// insert static and remaining dynamic
let mstr1 = epoch_2_dt(*positions[0].epoch.as_ref().unwrap() as i64)
.format("%Y%m")
.to_string();
//let t1 = c.transaction().expect("create tx");
let t1 = c.transaction().unwrap();
let _d1 = sqlite_insert_dynamic(&t1, positions, &mstr1).expect("inserting chunk");
let _c1 = sqlite_createtable_staticreport(&t1, &mstr1).expect("create static table");
let _s1 = sqlite_insert_static(&t1, stat_msgs, &mstr1).expect("insert");
let _ = t1.commit();
let elapsed = start.elapsed();
println!(
"{} count:{: >8} elapsed: {:0.4 }s rate: {:.0}/s",
filename.to_str().unwrap().rsplit_once('/').unwrap().1,
count,
elapsed.as_secs_f32(),
count as f32 / elapsed.as_secs_f32(),
);
Ok(())
}
......
......@@ -2,6 +2,12 @@
Changelog
=========
v1.0.21
-------
improved status messages when decoding
v1.0.20
-------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment