Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,33 @@

class AbstractSessionBuilder {
public:
std::string host = "localhost";
int rpcPort = 6667;
std::string username = "root";
std::string password = "root";
static constexpr const char* DEFAULT_HOST = "localhost";
static constexpr int DEFAULT_RPC_PORT = 6667;
static constexpr const char* DEFAULT_USERNAME = "root";
static constexpr const char* DEFAULT_PASSWORD = "root";
static constexpr int DEFAULT_FETCH_SIZE = 10000;
static constexpr int DEFAULT_CONNECT_TIMEOUT_MS = 3 * 1000;
static constexpr int DEFAULT_MAX_RETRIES = 3;
static constexpr int DEFAULT_RETRY_DELAY_MS = 500;
static constexpr const char* DEFAULT_SQL_DIALECT = "tree";
static constexpr bool DEFAULT_ENABLE_AUTO_FETCH = true;
static constexpr bool DEFAULT_ENABLE_REDIRECTIONS = true;
static constexpr bool DEFAULT_ENABLE_RPC_COMPRESSION = false;

std::string host = DEFAULT_HOST;
int rpcPort = DEFAULT_RPC_PORT;
std::string username = DEFAULT_USERNAME;
std::string password = DEFAULT_PASSWORD;
std::string zoneId = "";
int fetchSize = 10000;
int connectTimeoutMs = 3 * 1000;
int maxRetries = 3;
int retryDelayMs = 500;
std::string sqlDialect = "tree";
int fetchSize = DEFAULT_FETCH_SIZE;
int connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS;
int maxRetries = DEFAULT_MAX_RETRIES;
int retryDelayMs = DEFAULT_RETRY_DELAY_MS;
std::string sqlDialect = DEFAULT_SQL_DIALECT;
std::string database = "";
bool enableAutoFetch = true;
bool enableRedirections = true;
bool enableRPCCompression = false;
bool enableAutoFetch = DEFAULT_ENABLE_AUTO_FETCH;
bool enableRedirections = DEFAULT_ENABLE_REDIRECTIONS;
bool enableRPCCompression = DEFAULT_ENABLE_RPC_COMPRESSION;
std::vector<std::string> nodeUrls;
};

Expand Down
16 changes: 5 additions & 11 deletions iotdb-client/client-cpp/src/main/NodesSupplier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <iostream>
#include <utility>

const std::string NodesSupplier::SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
const std::string NodesSupplier::SHOW_AVAILABLE_URLS_COMMAND = "SHOW AVAILABLE URLS";
const std::string NodesSupplier::RUNNING_STATUS = "Running";
const std::string NodesSupplier::STATUS_COLUMN_NAME = "Status";
const std::string NodesSupplier::IP_COLUMN_NAME = "RpcAddress";
Expand Down Expand Up @@ -160,21 +160,19 @@ std::vector<TEndPoint> NodesSupplier::fetchLatestEndpoints() {
client_->init(userName_, password_, enableRPCCompression_, zoneId_, version);
}

auto sessionDataSet = client_->executeQueryStatement(SHOW_DATA_NODES_COMMAND);
auto sessionDataSet = client_->executeQueryStatement(SHOW_AVAILABLE_URLS_COMMAND);

uint32_t columnAddrIdx = -1, columnPortIdx = -1, columnStatusIdx = -1;
uint32_t columnAddrIdx = -1, columnPortIdx = -1;
auto columnNames = sessionDataSet->getColumnNames();
for (uint32_t i = 0; i < columnNames.size(); i++) {
if (columnNames[i] == IP_COLUMN_NAME) {
columnAddrIdx = i;
} else if (columnNames[i] == PORT_COLUMN_NAME) {
columnPortIdx = i;
} else if (columnNames[i] == STATUS_COLUMN_NAME) {
columnStatusIdx = i;
}
}

if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx == -1) {
if (columnAddrIdx == -1 || columnPortIdx == -1) {
throw IoTDBException("Required columns not found in query result.");
}

Expand All @@ -183,19 +181,15 @@ std::vector<TEndPoint> NodesSupplier::fetchLatestEndpoints() {
auto record = sessionDataSet->next();
std::string ip;
int32_t port = 0;
std::string status;

if (record->fields.at(columnAddrIdx).stringV.is_initialized()) {
ip = record->fields.at(columnAddrIdx).stringV.value();
}
if (record->fields.at(columnPortIdx).intV.is_initialized()) {
port = record->fields.at(columnPortIdx).intV.value();
}
if (record->fields.at(columnStatusIdx).stringV.is_initialized()) {
status = record->fields.at(columnStatusIdx).stringV.value();
}

if (ip == "0.0.0.0" || status != RUNNING_STATUS) {
if (ip == "0.0.0.0") {
log_warn("Skipping invalid node: " + ip + ":" + std::to_string(port));
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion iotdb-client/client-cpp/src/main/NodesSupplier.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class StaticNodesSupplier : public INodesSupplier {

class NodesSupplier : public INodesSupplier {
public:
static const std::string SHOW_DATA_NODES_COMMAND;
static const std::string SHOW_AVAILABLE_URLS_COMMAND;
static const std::string RUNNING_STATUS;
static const std::string STATUS_COLUMN_NAME;
static const std::string IP_COLUMN_NAME;
Expand Down
5 changes: 5 additions & 0 deletions iotdb-client/client-cpp/src/main/SessionBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class SessionBuilder : public AbstractSessionBuilder {
}

std::shared_ptr<Session> build() {
if (!AbstractSessionBuilder::nodeUrls.empty() &&
(AbstractSessionBuilder::host != DEFAULT_HOST ||
AbstractSessionBuilder::rpcPort != DEFAULT_RPC_PORT)) {
throw IoTDBException("Session builder does not support setting node urls and host/rpcPort at the same time.");
}
sqlDialect = "tree";
auto newSession = std::make_shared<Session>(this);
newSession->open(false);
Expand Down
5 changes: 5 additions & 0 deletions iotdb-client/client-cpp/src/main/TableSessionBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class TableSessionBuilder : public AbstractSessionBuilder {
return this;
}
std::shared_ptr<TableSession> build() {
if (!AbstractSessionBuilder::nodeUrls.empty() &&
(AbstractSessionBuilder::host != DEFAULT_HOST ||
AbstractSessionBuilder::rpcPort != DEFAULT_RPC_PORT)) {
throw IoTDBException("Session builder does not support setting node urls and host/rpcPort at the same time.");
}
sqlDialect = "table";
auto newSession = std::make_shared<Session>(this);
newSession->open(false);
Expand Down
1 change: 0 additions & 1 deletion iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import numpy as np


# Serialized tsBlock:
# +-------------+---------------+---------+------------+-----------+----------+
# | val col cnt | val col types | pos cnt | encodings | time col | val col |
Expand Down
Loading