You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

54 KiB

None <html> <head> </head>

Benchmark: ClickHouse Vs. InfluxDB Vs. Postgresql Vs. Parquet


How to use:

  • Rename the file "properties-model.ini" to "properties.ini"
  • Fill with your own credentials

The proposal of this work is to compare the speed in read/writing a midle level of data ( a dataset with 9 columns and 50.000 lines) to four diferent databases:

  • ClickHouse
  • InfluxDB
  • Postgresql
  • Parquet (in a S3 Minio Storage)
    ToDo:
  • DuckDB with Polars
  • MongoDB
  • Kdb+

Deve-se relevar: é uma "cold-storage" ou "frezze-storage"?
influxdb: alta leitura e possui a vantagem da indexaçõa para vizualização de dados em gráficos.

notas:

  • comparar tamanho do csv com parquet

Imports

In [18]:
import configparser
from datetime import datetime

import duckdb
import influxdb_client
import pandas as pd

# import pymongo
from clickhouse_driver import Client
from dotenv import load_dotenv
from minio import Minio
from pymongo import MongoClient
from pytz import timezone
from sqlalchemy import create_engine

load_dotenv()
Out[18]:
False
In [1]:
# Variables
dbname = "EURUSDtest"
In [5]:
arq = configparser.RawConfigParser()
arq.read("properties.ini")
ClickHouseUser = arq.get("CLICKHOUSE", "user")
ClickHouseKey = arq.get("CLICKHOUSE", "key")
ClickHouseUrl = arq.get("CLICKHOUSE", "url")

InfluxDBUser = arq.get("INFLUXDB", "user")
InfluxDBKey = arq.get("INFLUXDB", "key")
InfluxDBUrl = arq.get("INFLUXDB", "url")
InfluxDBBucket = arq.get("INFLUXDB", "bucket")

PostgresqlUser = arq.get("POSTGRESQL", "user")
PostgresqlKey = arq.get("POSTGRESQL", "key")
PostgresqlUrl = arq.get("POSTGRESQL", "url")
PostgresqlDB = arq.get("POSTGRESQL", "database")

S3MinioUser = arq.get("S3MINIO", "user")
S3MinioKey = arq.get("S3MINIO", "key")
S3MinioUrl = arq.get("S3MINIO", "url")
S3MinioRegion = arq.get("S3MINIO", "region")

MongoUser = arq.get("MONGODB", "user")
MongoKey = arq.get("MONGODB", "key")
MongoUrl = arq.get("MONGODB", "url")
In [ ]:
%%time
# Load Dataset
df = pd.read_csv("out.csv", index_col=0)
In [ ]:
# df.head()
In [ ]:
df["from"] = pd.to_datetime(df["from"], unit="s")
df["to"] = pd.to_datetime(df["to"], unit="s")
# Optional use when not transoformed yet
# Transform Datetime

Funçoes

-> Class

In [ ]:
def timestamp2dataHora(x, timezone_="America/Sao_Paulo"):
    d = datetime.fromtimestamp(x, tz=timezone(timezone_))
    return d

ClickHouse

In [ ]:
# !! O client oficial usa um driver http, nesse exemplo vamos usar a biblioteca
# de terceirtos clickhouse_driver recomendada, por sua vez que usa tcp.
client = Client(
    host=ClickHouseUrl,
    user=ClickHouseUser,
    password=ClickHouseKey,
    settings={"use_numpy": True},
)
In [ ]:
# Create Tables in ClickHouse
# !! ALTERAR TIPOS !!
# ENGINE: 'Memory' desaparece quando server é reiniciado
client.execute(
    "CREATE TABLE IF NOT EXISTS {} (id UInt32,"
    "from DateTime, at UInt64, to DateTime, open Float64,"
    "close Float64, min Float64, max  Float64, volume UInt32)"
    "ENGINE MergeTree ORDER BY to".format(dbname)
)
In [ ]:
%%time
# Write dataframe to db
client.insert_dataframe("INSERT INTO {} VALUES".format(dbname), df)
In [ ]:
%%time
client.query_dataframe("SELECT * FROM default.{}".format(dbname))  # LIMIT 10000
In [ ]:
# %%time
# df = pd.DataFrame(client.query_dataframe("SELECT * FROM default.{}".format(dbname)))

InfluxDB

In [ ]:
client = influxdb_client.InfluxDBClient(
    url=InfluxDBUrl, token=InfluxDBKey, org=InfluxDBUser
)
In [ ]:
# Read data from CSV without index and parse 'TimeStamp' as date.
df = pd.read_csv("out.csv", sep=",", index_col=False, parse_dates=["from"])
# Set 'TimeStamp' field as index of dataframe # test another indexs
df.set_index("from", inplace=True)
In [ ]:
df.head()
In [ ]:
%%time
# gravando... demorou... mas deu certo
with client.write_api() as writer:
    writer.write(
        bucket=InfluxDBBucket,
        record=df,
        data_frame_measurement_name="id",
        data_frame_tag_columns=["volume"],
    )
In [ ]:
# data
#   |> pivot(
#     rowKey:["_time"],
#     columnKey: ["_field"],
#     valueColumn: "_value"
#   )
In [ ]:
# Read

Postgresql

In [ ]:
# Connect / Create Tables
engine = create_engine(
    "postgresql+psycopg2://{}:{}@{}:5432/{}".format(
        PostgresqlUser, PostgresqlKey, PostgresqlUrl, PostgresqlDB
    )
)
In [ ]:
# Drop old table and create new empty table
df.head(0).to_sql("comparedbs", engine, if_exists="replace", index=False)
In [ ]:
%%time
# Write
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep="\t", header=False, index=False)
output.seek(0)
contents = output.getvalue()

cur.copy_from(output, "comparedbs")  # , null="")  # null values become ''
conn.commit()
cur.close()
conn.close()
In [ ]:
# Read

S3 Parquet

In [ ]:
# fazer sem funçao para ver se melhora
# verifique se esta no ssd os arquivos da pasta git
def main():
    client = Minio(
        S3MinioUrl,
        secure=False,
        region=S3MinioRegion,
        access_key="MatMPA7NyHltz7DQ",
        secret_key="SO1IG5iBPSjNPZanYUaHCLcoSbjphLCP",
    )

    # Make bucket if not exist.
    found = client.bucket_exists("data")
    if not found:
        client.make_bucket("data")
    else:
        print("Bucket 'data' already exists")

    # Upload
    client.fput_object(
        "data",
        "data.parquet",
        "data/data.parquet",
    )
    # print(
    #     "'data/data.parquet' is successfully uploaded as "
    #     "object 'data.parquet' to bucket 'data'."
    # )
In [ ]:
%%time
df.to_parquet("data/data.parquet")
if __name__ == "__main__":
    try:
        main()
    except S3Error as exc:
        print("error occurred.", exc)
In [ ]:
pq = pd.read_parquet("data/data.parquet", engine="pyarrow")
pq.head()

MongoDB

In [10]:
# Load csv dataset
data = pd.read_csv("out.csv")
In [36]:
# Connect to MongoDB
client = MongoClient(
    # "mongodb://192.168.1.133:27017"
    "mongodb://{}:{}@{}/EURUSDtest?retryWrites=true&w=majority".format(
        MongoUser, MongoKey, MongoUrl
    ),
    authSource="admin",
)
In [37]:
db = client["EUROUSDtest"]
collection = db["finance"]
# data.reset_index(inplace=True)
data_dict = data.to_dict("records")
In [38]:
%%time
# Insert collection
collection.insert_many(data_dict)
CPU times: user 19.2 s, sys: 269 ms, total: 19.5 s
Wall time: 50.1 s
Out[38]:
<pymongo.results.InsertManyResult at 0x7f8e39c991e0>
In [ ]:
# read

DuckDB

In [39]:
cursor = duckdb.connect()
print(cursor.execute("SELECT 42").fetchall())
[(42,)]
In [45]:
%%time
conn = duckdb.connect()
data = pd.read_csv("out.csv")
conn.register("EURUSDtest", data)
CPU times: user 1.53 s, sys: 63.6 ms, total: 1.59 s
Wall time: 1.59 s
Out[45]:
<duckdb.DuckDBPyConnection at 0x7f8e3b2261b0>
In [47]:
display(conn.execute("SHOW TABLES").df())
name
In [46]:
%%time
df = conn.execute("SELECT * FROM EURUSDtest").df()
df
Out[46]:
Unnamed: 0 id from at to open close min max volume
0 0 7730801 2023-01-02 15:58:45 1672675140000000000 2023-01-02 15:59:00 1.065995 1.066035 1.065930 1.066070 57
1 1 7730802 2023-01-02 15:59:00 1672675155000000000 2023-01-02 15:59:15 1.066055 1.066085 1.066005 1.066115 52
2 2 7730803 2023-01-02 15:59:15 1672675170000000000 2023-01-02 15:59:30 1.066080 1.066025 1.066025 1.066110 57
3 3 7730804 2023-01-02 15:59:30 1672675185000000000 2023-01-02 15:59:45 1.065980 1.065985 1.065885 1.066045 64
4 4 7730805 2023-01-02 15:59:45 1672675200000000000 2023-01-02 16:00:00 1.065975 1.066055 1.065830 1.066055 50
... ... ... ... ... ... ... ... ... ... ...
999995 999995 7984748 2023-03-03 18:13:30 1677867225000000000 2023-03-03 18:13:45 1.062695 1.062635 1.062630 1.062700 64
999996 999996 7984749 2023-03-03 18:13:45 1677867240000000000 2023-03-03 18:14:00 1.062645 1.062650 1.062625 1.062650 43
999997 999997 7984750 2023-03-03 18:14:00 1677867255000000000 2023-03-03 18:14:15 1.062640 1.062625 1.062620 1.062665 47
999998 999998 7984751 2023-03-03 18:14:15 1677867270000000000 2023-03-03 18:14:30 1.062625 1.062535 1.062535 1.062645 43
999999 999999 7984752 2023-03-03 18:14:30 1677867285000000000 2023-03-03 18:14:45 1.062535 1.062520 1.062520 1.062580 59

1000000 rows × 10 columns

Kdb+

In [66]:
import numpy as np

np.bool = np.bool_
from qpython import qconnection
In [49]:
# read csv
data = pd.read_csv("out.csv")
In [50]:
# open connection
q = qconnection.QConnection(host="localhost", port=5001)
q.open()
In [51]:
# send df to kd+ in memory bank
%%time
q.sendSync("{t::x}", data)
CPU times: user 837 ms, sys: 40 ms, total: 877 ms
Wall time: 1.16 s
In [52]:
# write to on disk table
q.sendSync("`:/home/sandman/q/tab1 set t")
Out[52]:
b':/home/sandman/q/tab1'
In [53]:
%%time
# read from on disk table
df2 = q.sendSync("tab2: get `:/home/sandman/q/tab1")
CPU times: user 1.78 ms, sys: 2 µs, total: 1.79 ms
Wall time: 329 ms
In [54]:
# print(df2)
In [55]:
%%time
# load to variable df2
df2 = q.sendSync("tab2")
CPU times: user 957 ms, sys: 87.9 ms, total: 1.05 s
Wall time: 1.13 s
In [58]:
# df2(type)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[58], line 1
----> 1 df2(type)

TypeError: 'QTable' object is not callable
In [67]:
%%time
# converto to dataframe
df = pd.DataFrame(q("t", pandas=True))
df.head()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File <timed exec>:2

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qconnection.py:385, in QConnection.__call__(self, *parameters, **options)
    384 def __call__(self, *parameters, **options):
--> 385     return self.sendSync(parameters[0], *parameters[1:], **options)

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qconnection.py:303, in QConnection.sendSync(self, query, *parameters, **options)
    249 '''Performs a synchronous query against a q service and returns parsed 
    250 data.
    251 
   (...)
    300          :class:`.QReaderException`
    301 '''
    302 self.query(MessageType.SYNC, query, *parameters, **options)
--> 303 response = self.receive(data_only = False, **options)
    305 if response.type == MessageType.RESPONSE:
    306     return response.data

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qconnection.py:380, in QConnection.receive(self, data_only, **options)
    341 def receive(self, data_only = True, **options):
    342     '''Reads and (optionally) parses the response from a q service.
    343     
    344     Retrieves query result along with meta-information:
   (...)
    378     :raises: :class:`.QReaderException`
    379     '''
--> 380     result = self._reader.read(**self._options.union_dict(**options))
    381     return result.data if data_only else result

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qreader.py:139, in QReader.read(self, source, **options)
    120 '''
    121 Reads and optionally parses a single message.
    122 
   (...)
    136           with meta information
    137 '''
    138 message = self.read_header(source)
--> 139 message.data = self.read_data(message.size, message.is_compressed, **options)
    141 return message

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qreader.py:216, in QReader.read_data(self, message_size, is_compressed, **options)
    213 if not self._stream and self._options.raw:
    214     raw_data = self._buffer.raw(message_size - 8)
--> 216 return raw_data if self._options.raw else self._read_object()

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qreader.py:225, in QReader._read_object(self)
    222 reader = self._get_reader(qtype)
    224 if reader:
--> 225     return reader(self, qtype)
    226 elif qtype >= QBOOL_LIST and qtype <= QTIME_LIST:
    227     return self._read_list(qtype)

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/_pandas.py:76, in PandasQReader._read_table(self, qtype)
     74 columns = self._read_object()
     75 self._buffer.skip() # ignore generic list type indicator
---> 76 data = QReader._read_general_list(self, qtype)
     78 odict = OrderedDict()
     79 meta = MetaData(qtype = QTABLE)

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qreader.py:338, in QReader._read_general_list(self, qtype)
    335 self._buffer.skip()  # ignore attributes
    336 length = self._buffer.get_int()
--> 338 return [self._read_object() for x in range(length)]

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qreader.py:338, in <listcomp>(.0)
    335 self._buffer.skip()  # ignore attributes
    336 length = self._buffer.get_int()
--> 338 return [self._read_object() for x in range(length)]

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/qreader.py:227, in QReader._read_object(self)
    225     return reader(self, qtype)
    226 elif qtype >= QBOOL_LIST and qtype <= QTIME_LIST:
--> 227     return self._read_list(qtype)
    228 elif qtype <= QBOOL and qtype >= QTIME:
    229     return self._read_atom(qtype)

File ~/dev/pipenv/lib/python3.10/site-packages/qpython/_pandas.py:116, in PandasQReader._read_list(self, qtype)
    114 if -abs(qtype) not in [QMONTH, QDATE, QDATETIME, QMINUTE, QSECOND, QTIME, QTIMESTAMP, QTIMESPAN, QSYMBOL]:
    115     null = QNULLMAP[-abs(qtype)][1]
--> 116     ps = pandas.Series(data = qlist).replace(null, numpy.NaN)
    117 else:
    118     ps = pandas.Series(data = qlist)

File ~/dev/pipenv/lib/python3.10/site-packages/pandas/core/series.py:5219, in Series.replace(self, to_replace, value, inplace, limit, regex, method)
   5203 @doc(
   5204     NDFrame.replace,
   5205     klass=_shared_doc_kwargs["klass"],
   (...)
   5217     method: Literal["pad", "ffill", "bfill"] | lib.NoDefault = lib.no_default,
   5218 ) -> Series | None:
-> 5219     return super().replace(
   5220         to_replace=to_replace,
   5221         value=value,
   5222         inplace=inplace,
   5223         limit=limit,
   5224         regex=regex,
   5225         method=method,
   5226     )

File ~/dev/pipenv/lib/python3.10/site-packages/pandas/core/generic.py:7389, in NDFrame.replace(self, to_replace, value, inplace, limit, regex, method)
   7383         new_data = self._mgr.replace_regex(
   7384             to_replace=to_replace,
   7385             value=value,
   7386             inplace=inplace,
   7387         )
   7388     else:
-> 7389         new_data = self._mgr.replace(
   7390             to_replace=to_replace, value=value, inplace=inplace
   7391         )
   7392 else:
   7393     raise TypeError(
   7394         f'Invalid "to_replace" type: {repr(type(to_replace).__name__)}'
   7395     )

File ~/dev/pipenv/lib/python3.10/site-packages/pandas/core/internals/managers.py:475, in BaseBlockManager.replace(self, to_replace, value, inplace)
    473 assert not is_list_like(to_replace)
    474 assert not is_list_like(value)
--> 475 return self.apply(
    476     "replace",
    477     to_replace=to_replace,
    478     value=value,
    479     inplace=inplace,
    480     using_cow=using_copy_on_write(),
    481 )

File ~/dev/pipenv/lib/python3.10/site-packages/pandas/core/internals/managers.py:352, in BaseBlockManager.apply(self, f, align_keys, **kwargs)
    350         applied = b.apply(f, **kwargs)
    351     else:
--> 352         applied = getattr(b, f)(**kwargs)
    353     result_blocks = extend_blocks(applied, result_blocks)
    355 out = type(self).from_blocks(result_blocks, self.axes)

File ~/dev/pipenv/lib/python3.10/site-packages/pandas/core/internals/blocks.py:593, in Block.replace(self, to_replace, value, inplace, mask, using_cow)
    590         return [self] if inplace else [self.copy()]
    592 if mask is None:
--> 593     mask = missing.mask_missing(values, to_replace)
    594 if not mask.any():
    595     # Note: we get here with test_replace_extension_other incorrectly
    596     #  bc _can_hold_element is incorrect.
    597     if using_cow:

File ~/dev/pipenv/lib/python3.10/site-packages/pandas/core/missing.py:112, in mask_missing(arr, values_to_mask)
    108             new_mask = arr == x
    110             if not isinstance(new_mask, np.ndarray):
    111                 # usually BooleanArray
--> 112                 new_mask = new_mask.to_numpy(dtype=bool, na_value=False)
    113         mask |= new_mask
    115 if na_mask.any():

AttributeError: 'bool' object has no attribute 'to_numpy'
In [19]:
%%time
# select
df3 = q.sendSync("select from t")
In [ ]:
q.close()
</html>