  int jpid;                              //JMS Pool Id
  int jcid;                              //JMS Connection Id
  char error_msg[1024 + 1];              // Error string. Must be 1024 size
  int ret;                               // Error code
  char *msg = "message";                 // Message to be produced
  int msg_len = strlen(msg);             // Message Length
  char *key = "message key";            // Message Key
  int key_length = strlen(key);             // Message Length
  
  // Initializes JMS Pool with given pool size and arguments
  // It returns a Pool ID which need to be passed in subsequent api for getting connection from the pool
  // In case of error, it returns error code with error_msg (see help for details)
  // ns_kafka_init_producer(kafka_hostname, kafka_port, kafka_topic, max_pool_size, error_msg)) 
  if((jpid = ns_kafka_init_producer("127.0.0.1", 9092, "topic", 1, "error_msg")) < 0)
  {
  fprintf(stderr, "Error in initializing Kafka producer. Error code = %d, Error Msg = %s", jpid, error_msg);
  return;
  }
  
  // Sets kafka security protocol
  // ns_kafka_set_security_protocol(jpid, security_protocol, error_msg)
  if((ret = ns_kafka_set_security_protocol(jpid, "ssl", error_msg)) < 0)
  {
  fprintf(stderr, "Error in setting Kafka security protocol. Error code = %d, Error Msg = %s", ret, error_msg);
  return;
  }
  
  
  // Sets ssl ciphers in case of SSL protocol
  // ns_kafka_set_ssl_ciphers(jpid, ciphers, error_msg)
  if((ret = ns_kafka_set_ssl_ciphers(jpid, "ciphers", error_msg)) < 0)
  {
  fprintf(stderr, "Error in setting Kafka ssl ciphers. Error code = %d, Error Msg = %s", ret, error_msg);
  return;
  }
  
  // Sets ssl key from the given file path . Here KeyPassword is mandatory
  // ns_kafka_set_ssl_keyFile(jpid, keyFilePath, keyPassword, error_msg)
  if((ret = ns_kafka_set_ssl_key_file(jpid, "/home/keyfile","Password", error_msg)) < 0)
  {
  fprintf(stderr, "Error in setting Kafka ssl key.Error code = %d, Error Msg = %s", ret, error_msg);
  return;
  }
  
  // Sets ssl certificate from the given file path
  // ns_kafka_set_ssl_certFile(jpid, certificateFilePath, error_msg)
  if((ret = ns_kafka_set_ssl_cert_file(jpid, "/home/certificatefile", error_msg)) < 0)
  {
  fprintf(stderr, "Error in setting Kafka ssl certificate. Error code = %d, Error Msg = %s", ret, error_msg);
  return;
  }
  
  // Sets ssl ca certificate from the given file path
  // ns_kafka_set_ssl_ca_file(jpid, caCertifcateFilePath, error_msg)
  if((ret = ns_kafka_set_ssl_ca_file(jpid, "/home/trustedca", error_msg)) < 0)
  {
  fprintf(stderr, "Error in setting Kafka ssl ca certificate.Error code = %d, Error Msg = %s", ret, error_msg);
  return;
  }
  
  // Sets ssl server certificate from the given file path
  // This is needed if verifiy broker is enabled
  // ns_kafka_set_ssl_crl_file(jpid, crlFilePath, error_msg)
  if((ret = ns_kafka_set_ssl_crl_file(jpid, "/home/crlfile", error_msg)) < 0)
  {
  fprintf(stderr, "Error in setting Kafka ssl certificate.Error code = %d, Error Msg = %s", ret, error_msg);
  return;
  }
  
  // Initalizes Producer connection by getting connection from the given pool ID.
  // Returns the connection id  . will reuse connection if free or make new connection  ???
  // In case of error, it returns error code with error_msg (see help for details)
  // If connection is made, then transaction is set with name passed in second argument. for example here it sets KAFKAProducerConnect
  // If connection fails, then transaction is set with <TxName><ErrorMsg>. For example, KAFKAProducerConnectTimeout
  // ns_kafka_get_connection(jpid, "KAFKAProducerConnect", error_msg)
  if((jcid = ns_kafka_get_connection(jpid, "KAFKAProducerConnect", error_msg)) < 0)
  {
  fprintf(stderr, "Error in getting Kafka connention. Error code = %d, Error Msg = %s", jcid, error_msg);
  return;
  }
  
  // Puts message in the JMS queue/topic using connection ID returned from get connection api
  // In case of error, it returns error code with error_msg (see help for details)
  // Here Transaction is set with name passed in sixth argument . for example here it sets KAFKAPutMsg
  // If api fails, then transaction is set with <TxName><ErrorMsg>. For example, KAFKAPutMsgConnectTimeout
  // ns_kafka_put_msg(jcid, msg, msg_len, "KAFKAPutMsg", error_msg)
  // ns_kafka_put_msg_v2(jcid, msg, msg_len, key, key_length, "KAFKAPutMsg", error_msg)
  if((ret = ns_kafka_put_msg_v2(jcid, msg, msg_len, key, key_length, "KAFKAPutMsg", error_msg)) < 0)
  {
  fprintf(stderr, "Error in putting message using Kafka connention. Error code = %d, Error Msg = %s", ret, error_msg);
  }
  
  // Releases the connection from connection pool 
  // It should be called every time so that another user can reuse it.
  // ns_kakfa_release_connection(jcid, error_msg)
  if((ret = ns_kakfa_release_connection(jcid, error_msg)) < 0)
  {
  fprintf(stderr, "Error in releasing Kafka connection.Error code = %d, Error Msg = %s", ret, error_msg);
  return;
  }
  
  /*
  // Closes connection with kafka server 
  // It should be called at the end of test and not every time as it may have performance overhead.
  // ns_kakfa_close_connection(jcid, "KAFKAProducerSSLClose" ,error_msg)
  if((ret = ns_kakfa_close_connection(jcid, "KAFKAProducerSSLClose" ,error_msg)) < 0 )
  {
  fprintf(stderr, "Error in closing Kafka connection %d",ret);
  return;
  }
  
  */
  // Page think time in case of adding delay for next session
  ns_page_think_time(0.0);