class
      
      
        TopicConfigManager extends Logging
      
      
      
          
        
      
      
        
        
              Instance Constructors
              - 
      
      
      
      
        
        new
      
      
        TopicConfigManager(zkClient: ZkClient, logManager: LogManager, changeExpirationMs: Long = 15.*(60).*(1000).toLong, time: Time = kafka.utils.SystemTime)
      
      
 
        
        
        
              Value Members
              - 
      
      
      
      
        final 
        def
      
      
        !=(arg0: AnyRef): Boolean
      
      
- 
      
      
      
      
        final 
        def
      
      
        !=(arg0: Any): Boolean
      
      
- 
      
      
      
      
        final 
        def
      
      
        ##(): Int
      
      
- 
      
      
      
      
        final 
        def
      
      
        ==(arg0: AnyRef): Boolean
      
      
- 
      
      
      
      
        final 
        def
      
      
        ==(arg0: Any): Boolean
      
      
- 
      
      
      
      
    
- 
      
      
      
      
        final 
        def
      
      
        asInstanceOf[T0]: T0
      
      
- 
      
      
      
      
        
        def
      
      
        clone(): AnyRef
      
      
- 
      
      
      
      
        
        def
      
      
        debug(msg: ⇒ String, e: ⇒ Throwable): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        debug(e: ⇒ Throwable): Any
      
      
- 
      
      
      
      
        
        def
      
      
        debug(msg: ⇒ String): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        eq(arg0: AnyRef): Boolean
      
      
- 
      
      
      
      
        
        def
      
      
        equals(arg0: Any): Boolean
      
      
- 
      
      
      
      
        
        def
      
      
        error(msg: ⇒ String, e: ⇒ Throwable): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        error(e: ⇒ Throwable): Any
      
      
- 
      
      
      
      
        
        def
      
      
        error(msg: ⇒ String): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        fatal(msg: ⇒ String, e: ⇒ Throwable): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        fatal(e: ⇒ Throwable): Any
      
      
- 
      
      
      
      
        
        def
      
      
        fatal(msg: ⇒ String): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        finalize(): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        getClass(): Class[_]
      
      
- 
      
      
      
      
        
        def
      
      
        hashCode(): Int
      
      
- 
      
      
      
      
        
        def
      
      
        info(msg: ⇒ String, e: ⇒ Throwable): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        info(e: ⇒ Throwable): Any
      
      
- 
      
      
      
      
        
        def
      
      
        info(msg: ⇒ String): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        isInstanceOf[T0]: Boolean
      
      
- 
      
      
      
      
        
        var
      
      
        logIdent: String
      
      
- 
      
      
      
      
        
        lazy val
      
      
        logger: Logger
      
      
- 
      
      
      
      
        
        val
      
      
        loggerName: String
      
      
- 
      
      
      
      
        final 
        def
      
      
        ne(arg0: AnyRef): Boolean
      
      
- 
      
      
      
      
        final 
        def
      
      
        notify(): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        notifyAll(): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        startup(): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        swallow(action: ⇒ Unit): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        swallowDebug(action: ⇒ Unit): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        swallowError(action: ⇒ Unit): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        swallowInfo(action: ⇒ Unit): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        swallowTrace(action: ⇒ Unit): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        swallowWarn(action: ⇒ Unit): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        synchronized[T0](arg0: ⇒ T0): T0
      
      
- 
      
      
      
      
        
        def
      
      
        toString(): String
      
      
- 
      
      
      
      
        
        def
      
      
        trace(msg: ⇒ String, e: ⇒ Throwable): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        trace(e: ⇒ Throwable): Any
      
      
- 
      
      
      
      
        
        def
      
      
        trace(msg: ⇒ String): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        wait(): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        wait(arg0: Long, arg1: Int): Unit
      
      
- 
      
      
      
      
        final 
        def
      
      
        wait(arg0: Long): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        warn(msg: ⇒ String, e: ⇒ Throwable): Unit
      
      
- 
      
      
      
      
        
        def
      
      
        warn(e: ⇒ Throwable): Any
      
      
- 
      
      
      
      
        
        def
      
      
        warn(msg: ⇒ String): Unit
      
      
 
        
        
         
        
        
              
Inherited from AnyRef
            
              
Inherited from Any
            
        
         
        
       
      
      
      
     
      
This class initiates and carries out topic config changes.
It works as follows.
Config is stored under the path /brokers/topics/<topic_name>/config This znode stores the topic-overrides for this topic (but no defaults) in properties format.
To avoid watching all topics for changes instead we have a notification path /brokers/config_changes The TopicConfigManager has a child watch on this path.
To update a topic config we first update the topic config properties. Then we create a new sequential znode under the change path which contains the name of the topic that was updated, say /brokers/config_changes/config_change_13321 This is just a notification--the actual config change is stored only once under the /brokers/topics/<topic_name>/config path.
This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. For any new changes it reads the new configuration, combines it with the defaults, and updates the log config for all logs for that topic (if any) that it has.
Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice, but that is harmless.
On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions on startup where a change might be missed between the initial config load and registering for change notifications.